2

tl;dr: Is there a way to close a WebSocket that's currently doing (sync) read() operation, if server sends nothing for some time?

I wanted to make a simple WebSocket client with Boost::beast. When I realized that read() is a blocking operation, and that there's no way to tell if there's a message coming - I created a sleeper thread. All the thread does is read() and I can afford to have it blocked if no data is coming.

I want it to be able to close the connection so from non-blocked thread I shoot a websocket::close(). This causes read() to throw a BOOST_ASSERT() at me:

Assertion failed: ! impl.wr_close

How can I close the connection when (sync) read() is ongoing?

Code for reproduction of my scenario:


#include <string>
#include <thread>
#include <chrono>

#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/asio/connect.hpp>
#include <boost/asio/ip/tcp.hpp>


using namespace std::chrono_literals;

class HandlerThread {

    enum class Status {
        UNINITIALIZED,
        DISCONNECTED,
        CONNECTED,
        READING,
    };

    const std::string _host;
    const std::string _port;
    std::string _resolvedAddress;

    boost::asio::io_context        _ioc;
    boost::asio::ip::tcp::resolver _resolver;
    boost::beast::websocket::stream<boost::asio::ip::tcp::socket> _websocket;

    boost::beast::flat_buffer _buffer;

    bool isRunning = true;
    Status _connectionStatus = Status::UNINITIALIZED;

public:
    HandlerThread(const std::string& host, const uint16_t port)
    : _host(std::move(host))
    , _port(std::to_string(port))
    , _ioc()
    , _resolver(_ioc)
    , _websocket(_ioc) {}

    void Run() {
        // isRunning is also useless, due to blocking boost::beast operations.
        while(isRunning) {
            switch (_connectionStatus) {
                case Status::UNINITIALIZED:
                case Status::DISCONNECTED:
                    if (!connect()) {
                        _connectionStatus = Status::DISCONNECTED;
                        break;
                    }
                case Status::CONNECTED:
                case Status::READING:
                    if (!read()) {
                        _connectionStatus = Status::DISCONNECTED;
                        break;
                    }
            }
        }
    }

    void Close()
    {
         isRunning = false;
        _websocket.close(boost::beast::websocket::close_code::normal);
    }

private:
    bool connect()
    {
        // All here is copy-paste from the examples.
        boost::system::error_code errorCode;
        // Look up the domain name  
        auto const results = _resolver.resolve(_host, _port, errorCode);
        if (errorCode) return false;

        // Make the connection on the IP address we get from a lookup
        auto ep = boost::asio::connect(_websocket.next_layer(), results, errorCode);
        if (errorCode) return false;

        _resolvedAddress = _host + ':' + std::to_string(ep.port());

        _websocket.set_option(boost::beast::websocket::stream_base::decorator(
            [](boost::beast::websocket::request_type& req)
            {
                req.set(boost::beast::http::field::user_agent,
                    std::string(BOOST_BEAST_VERSION_STRING) +
                        " websocket-client-coro");
            }));

        boost::beast::websocket::response_type res;
        _websocket.handshake(res, _resolvedAddress, "/", errorCode);

        if (errorCode) return false;

        _connectionStatus = Status::CONNECTED;
        return true;
    }

    bool read()
    {
        boost::system::error_code errorCode;
        _websocket.read(_buffer, errorCode);

        if (errorCode) return false;

        if (_websocket.is_message_done()) {
            _connectionStatus = Status::CONNECTED;
            // notifyRead(_buffer);
            _buffer.clear();    
        } else {
            _connectionStatus = Status::READING;
        }

        return true;
    }
};

int main() {
    HandlerThread handler("localhost", 8080);
    std::thread([&]{
        handler.Run();
    }).detach(); // bye!

    std::this_thread::sleep_for(3s);
    handler.Close(); // Bad idea...

    return 0;
}
Latawiec
  • 341
  • 2
  • 10

1 Answers1

1

There is no such thing. You might be able to force something at the TCP stack (so, operating system, usually) level. E.g. disabling the network interface involved.

Note that most synchronous code can be trivially be transformed into asynchronous code with the exact same blocking semantics using asio::use_future. That means you can use async deadlines. And those are supported by beast out of the box (basic your websocket on beast::tcp_stream instead of asio::ip::tcp::socket)

UPDATE

To the added code

Review/Simplify

Reduced the code removing unneeded bits and adding some fixes and demonstration handler notification so we can test the functioning:

Live On Coliru

#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/signals2.hpp>
namespace net       = boost::asio;
namespace beast     = boost::beast;
namespace websocket = beast::websocket;
using namespace std::chrono_literals;

class WsConnect {
    using tcp = net::ip::tcp;
    const std::string              _host, _port;
    net::io_context                _ioc;
    beast::flat_buffer             _buffer;
    websocket::stream<tcp::socket> _ws{_ioc};

    enum class Status { DISCONNECTED, CONNECTED } _status{Status::DISCONNECTED};
    std::atomic_bool _running{true}; // SEHE
  public:
    boost::signals2::signal<void(std::string)> callback; // SEHE

    WsConnect(std::string host, uint16_t port)
        : _host(std::move(host))
        , _port(std::to_string(port)) {}

    void Run() {
        while (_running)
            try {
                switch (_status) {
                    case Status::DISCONNECTED: do_connect(); [[fallthrough]];
                    case Status::CONNECTED: do_read();
                }
            } catch (boost::system::system_error const& se) {
                // se.code() is the error_code
                _status = Status::DISCONNECTED;
                std::this_thread::sleep_for(50ms); // SEHE avoid tight reconnect loop
            }
    }

    void Close() {
        _running = false;
        beast::error_code ec;
        _ws.close(websocket::close_code::normal, ec);
    }

  private:
    void do_connect() {
        connect(beast::get_lowest_layer(_ws), tcp::resolver(_ioc).resolve(_host, _port));

        _ws.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
            req.set(beast::http::field::user_agent, BOOST_BEAST_VERSION_STRING " WsConnect");
        }));

        _ws.handshake(_host + ':' + _port, "/");
        _buffer.clear(); // SEHE this was missing
    }

    void do_read() {
        do {
            _ws.read(_buffer);
        } while (!_ws.is_message_done());

        callback(beast::buffers_to_string(_buffer.cdata())); // SEHE just for demo
        _buffer.clear();
    }
};

#include <iomanip>
#include <iostream>
void handle_message(std::string_view msg) { std::cout << "Handling " << quoted(msg) << std::endl; }

int main() {
    WsConnect conn("localhost", 8989);
    std::thread([&] { conn.Run(); }).detach(); // bye!
    boost::signals2::scoped_connection subscription = conn.callback.connect(handle_message);

    std::this_thread::sleep_for(3s);
    conn.Close(); // Bad idea...
}

Exercising it for demonstration:

enter image description here

Design Issues

I'd argue there are three design issues:

  • blocking IO operations do not afford cancellation
  • HandlerThread is not a thread
  • detach()-ing threads makes them ungovernable by definition.

Here's a fix, naively only time-limiting operations like in your example:

Live On Coliru

#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/signals2.hpp>
#include <iostream>
namespace net       = boost::asio;
namespace beast     = boost::beast;
namespace websocket = beast::websocket;
using namespace std::chrono_literals;

struct WsConnect {
    boost::signals2::signal<void(std::string)> callback;

    WsConnect(std::string host, uint16_t port)
        : _host(std::move(host))
        , _port(std::to_string(port)) {}

    void Run(std::chrono::steady_clock::duration d) {
        _ws.next_layer().expires_after(d);
        for (;; std::this_thread::sleep_for(50ms))
            try {
                do_connect();
                for (;;)
                    do_read();
            } catch (boost::system::system_error const& se) {
                std::cerr << "Error: " << se.code().message() << std::endl;
                if (se.code() == beast::error::timeout)
                    break;
            }
    }

  private:
    using tcp = net::ip::tcp;
    const std::string                    _host, _port;
    beast::flat_buffer                   _buffer;
    net::thread_pool                     _ioc{1};
    websocket::stream<beast::tcp_stream> _ws{_ioc};

    void do_connect() {
        _ws.next_layer()
            .async_connect(tcp::resolver(_ioc).resolve(_host, _port), net::use_future)
            .get();

        _ws.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
            req.set(beast::http::field::user_agent, BOOST_BEAST_VERSION_STRING " WsConnect");
        }));

        _ws.async_handshake(_host + ':' + _port, "/", net::use_future).get();
        _buffer.clear();
    }

    void do_read() {
        do
            _ws.async_read(_buffer, net::use_future).get();
        while (!_ws.is_message_done());

        callback(beast::buffers_to_string(_buffer.cdata()));
        _buffer.clear();
    }
};

void handle_message(std::string_view msg) { std::cout << "Handling " << msg << std::flush; }

int main() {
    WsConnect conn("localhost", 8989);
    boost::signals2::scoped_connection subscription = conn.callback.connect(handle_message);
    conn.Run(3s);
} // close implied by destructors

Note how the code got simpler, shorter, and we even print error information. It's achieved by using use_future together with beast::tcp_stream::expires_after:

enter image description here

Full Cancellation

To allow for /externally triggered/ cancellation (instead of a fixed deadline), we can cheat a little by using 2 threads, so one can be "clogged" doing blocking waits on the futures:

Live On Coliru

#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/signals2.hpp>
#include <iostream>
namespace net       = boost::asio;
namespace beast     = boost::beast;
namespace websocket = beast::websocket;
using namespace std::chrono_literals;

struct BackgroundWs {
    boost::signals2::signal<void(std::string)> callback;

    BackgroundWs(std::string host, uint16_t port)
        : _host(std::move(host))
        , _port(std::to_string(port)) {}

    void Start() {
        post(_ws.get_executor(), [this] { do_run(); });
    }

    void Stop() { do_stop().get(); }

    ~BackgroundWs() {
        do_stop().wait(); // noexcept, please
    }

  private:
    using tcp = net::ip::tcp;
    const std::string              _host, _port;
    beast::flat_buffer             _buffer;
    net::thread_pool               _ioc{2};
    websocket::stream<tcp::socket> _ws{_ioc};
    bool                           _stopped{false};

    void do_run() {
        for (; !_stopped; std::this_thread::sleep_for(50ms))
            try {
                do_connect();
                for (;;)
                    do_read();
            } catch (boost::system::system_error const& se) {
                std::cerr << "Error: " << se.code().message() << std::endl;
            }
    }

    std::future<void> do_stop() {
        return dispatch(_ws.get_executor(), std::packaged_task<void()>([this] {
                            _stopped = true;
                            _ws.next_layer().cancel();
                            _ws.close(websocket::normal);
                        }));
    }

    void do_connect() {
        async_connect(_ws.next_layer(), tcp::resolver(_ioc).resolve(_host, _port), net::use_future)
            .get();

        _ws.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
            req.set(beast::http::field::user_agent, BOOST_BEAST_VERSION_STRING " WsConnect");
        }));

        _ws.async_handshake(_host + ':' + _port, "/", net::use_future).get();
        _buffer.clear();
    }

    void do_read() {
        do
            _ws.async_read(_buffer, net::use_future).get();
        while (!_ws.is_message_done());

        callback(beast::buffers_to_string(_buffer.cdata()));
        _buffer.clear();
    }
};

void handle_message(std::string_view msg) { std::cout << "Handling " << msg << std::flush; }

int main() {
    {
        BackgroundWs client("localhost", 8989);
        boost::signals2::scoped_connection subscription = client.callback.connect(handle_message);
        client.Start();

        std::string input;
        while (std::cout << "Enter 'Stop' to stop... " && getline(std::cin, input))
            if (input == "Stop") break;

        // client.Stop(); // or just rely on destructor
    } // destructor

    std::cout << "Press Enter to quit... ";
    std::cin.ignore(1024, '\n');
}

Now, the thing runs, reconnecting happily until the user enters Stop on the terminal:

enter image description here

No Cheats, C++20

In C++ using coro's you can have basically identical code being true async. This gets rid of the "cheating extra thread":

Live On Coliru

#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/signals2.hpp>
#include <iostream>
namespace net       = boost::asio;
namespace beast     = boost::beast;
namespace websocket = beast::websocket;
using namespace std::chrono_literals;

struct BackgroundWs {
    boost::signals2::signal<void(std::string)> callback;

    BackgroundWs(std::string host, uint16_t port)
        : _host(std::move(host))
        , _port(std::to_string(port)) {}

    void Start() { net::co_spawn(_ws.get_executor(), do_run(), net::detached); }
    void Stop() { do_stop().get(); }

    ~BackgroundWs() {
        do_stop().wait(); // noexcept, please
    }

  private:
    using tcp = net::ip::tcp;
    const std::string              _host, _port;
    beast::flat_buffer             _buffer;
    net::thread_pool               _ioc{1};
    websocket::stream<tcp::socket> _ws{_ioc};
    bool                           _stopped{false};

    net::awaitable<void> do_run() {
        for (; !_stopped; co_await async_sleep(50ms))
            try {
                co_await do_connect();
                for (;;)
                    co_await do_read();
            } catch (boost::system::system_error const& se) {
                std::cerr << "Error: " << se.code().message() << std::endl;
            }
    }

    net::awaitable<void> async_sleep(auto duration) {
        co_await net::steady_timer(_ws.get_executor(), duration).async_wait(net::use_awaitable);
    }

    std::future<void> do_stop() {
        return dispatch(_ws.get_executor(), std::packaged_task<void()>([this] {
                            _stopped = true;
                            _ws.next_layer().cancel();
                            _ws.close(websocket::normal);
                        }));
    }

    net::awaitable<void> do_connect() {
        co_await async_connect(_ws.next_layer(), tcp::resolver(_ioc).resolve(_host, _port),
                               net::use_awaitable);

        _ws.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
            req.set(beast::http::field::user_agent, BOOST_BEAST_VERSION_STRING " WsConnect");
        }));

        co_await _ws.async_handshake(_host + ':' + _port, "/", net::use_awaitable);
        _buffer.clear();
    }

    net::awaitable<void> do_read() {
        do
            co_await _ws.async_read(_buffer, net::use_awaitable);
        while (!_ws.is_message_done());

        callback(beast::buffers_to_string(_buffer.cdata()));
        _buffer.clear();
    }
};

void handle_message(std::string_view msg) { std::cout << "Handling " << msg << std::flush; }

int main() {
    {
        BackgroundWs client("localhost", 8989);
        boost::signals2::scoped_connection subscription = client.callback.connect(handle_message);

        client.Start();

        std::string input;
        while (std::cout << "Enter 'Stop' to stop... " && getline(std::cin, input))
            if (input == "Stop") break;

        // client.Stop(); // or just rely on destructor
    } // destructor

    std::cout << "Press Enter to quit... ";
    std::cin.ignore(1024, '\n');
}

Not Using C++20

This is conceptually identical, but more tedious as it requires explicit callback functions:

Live On Coliru

#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/signals2.hpp>
#include <iostream>
namespace net       = boost::asio;
namespace beast     = boost::beast;
namespace websocket = beast::websocket;
using namespace std::chrono_literals;

struct BackgroundWs {
    boost::signals2::signal<void(std::string)> callback;

    BackgroundWs(std::string host, uint16_t port)
        : _host(std::move(host))
        , _port(std::to_string(port)) {}

    void Start() {
        net::post(_ws.get_executor(), [this] {
            _stop_requested = false;
            do_step_machine();
        });
    }
    void Stop() { do_stop().get(); }

    ~BackgroundWs() {
        do_stop().wait(); // noexcept, please
    }

  private:
    using tcp = net::ip::tcp;
    using error_code = boost::system::error_code;
    const std::string              _host, _port;
    beast::flat_buffer             _buffer;
    net::thread_pool               _ioc{1};
    websocket::stream<tcp::socket> _ws{_ioc};
    net::steady_timer              _timer{_ws.get_executor()};

    bool _stop_requested = false;
    enum class Status { INIT, CONNECTED, DISCONNECTED, STOPPED } _status{};

    void handle(error_code ec) {
        std::cerr << "Error: " << ec.message() << std::endl;
        if (ec.failed())
            _status = _stop_requested ? Status::STOPPED : Status::DISCONNECTED;
        do_step_machine();
    }

    void do_step_machine() {
        switch(_status) {
            case Status::INIT:         return do_connect();
            case Status::CONNECTED:    return do_read();
            case Status::DISCONNECTED: return do_reconnect_delay(50ms);
            case Status::STOPPED:      break;
        };
    }

    void do_reconnect_delay(std::chrono::steady_clock::duration d) {
        _timer.expires_after(d);
        _timer.async_wait([this](error_code ec) {
            if (ec) return handle(ec);
            _status = Status::INIT;
            do_step_machine();
        });
    }

    std::future<void> do_stop() {
        return dispatch(_ws.get_executor(), std::packaged_task<void()>([this] {
                            _stop_requested = true;
                            _ws.next_layer().cancel();
                            _ws.close(websocket::normal);
                        }));
    }

    void do_connect() {
        async_connect( //
            _ws.next_layer(), tcp::resolver(_ioc).resolve(_host, _port),
            [this](error_code ec, tcp::endpoint /*ep*/) {
                if (ec) return handle(ec);

                _ws.set_option(websocket::stream_base::decorator([](websocket::request_type& req) {
                    req.set(beast::http::field::user_agent,
                            BOOST_BEAST_VERSION_STRING " WsConnect");
                }));

                _ws.async_handshake(_host + ':' + _port, "/", [this](error_code ec) {
                    if (ec) return handle(ec);
                    _status = Status::CONNECTED;
                    _buffer.clear();
                    do_step_machine();
                });
            });
    }

    void do_read() {
        _ws.async_read(_buffer, [this](error_code ec, size_t) {
            if (ec) return handle(ec);

            if (_ws.is_message_done()) {
                callback(beast::buffers_to_string(_buffer.cdata()));
                _buffer.clear();
                do_step_machine();
            } else {
                do_read();
            }
        });
    }
};

void handle_message(std::string_view msg) { std::cout << "Handling " << msg << std::flush; }

int main() {
    {
        BackgroundWs client("localhost", 8989);
        boost::signals2::scoped_connection subscription = client.callback.connect(handle_message);

        client.Start();

        std::string input;
        while (std::cout << "Enter 'Stop' to stop... " && getline(std::cin, input))
            if (input == "Stop") break;

        // client.Stop(); // or just rely on destructor
    } // destructor

    std::cout << "Press Enter to quit... ";
    std::cin.ignore(1024, '\n');
}

Again, same behaviour:

enter image description here

sehe
  • 374,641
  • 47
  • 450
  • 633
  • I don't quite get it. I am aware I can use the async_* set of functions and provide them with my own timeout. What do I need `beast::tcp_stream` for? Or are these two things absolutely unrelated? – Latawiec Mar 15 '23 at 22:14
  • @Latawiec It [has the deadline timer built-in](https://www.boost.org/doc/libs/1_81_0/libs/beast/doc/html/beast/ref/boost__beast__tcp_stream.html#:~:text=expires_after-,Set%20the%20timeout,-for%20the%20next), for your convenience. You know, if you *had* included code, I could have shown you, instead of just telling you :) – sehe Mar 15 '23 at 22:20
  • Be my guest! I will remove anything that's not in question and add the code to the question, 5 min tops. – Latawiec Mar 15 '23 at 22:28
  • Code added, I checked and it builds and also fails on close() – Latawiec Mar 15 '23 at 22:45
  • Ok I think I figured how `asio::use_future` is supposed to work. But I don't get one thing - where is my `errorCode`? It looks like if I want to use `asio::use_future` I also have to do error handling through exceptions, that's weird. I think what I'm gonna do instead is provide a handler to async_* calls, and set my own future with some ErrorOr struct. – Latawiec Mar 16 '23 at 12:39
  • 1
    @Latawiec embrace exceptions, use as_tuple or use redirect_error. I'd embrace exceptions, like [in your own code](http://coliru.stacked-crooked.com/a/ce2ef1343d39eca2). I'm close to updating the answer with some alternatives. – sehe Mar 16 '23 at 13:03
  • 1
    And all versions up. I'd take my time looking at the (subtle) differences and points. The last version is most tedious but perhaps appeals most to you because it rehabilitates the full state machine (in fact introducing more states as required for smarter reconnect timing e.g.). – sehe Mar 16 '23 at 15:26
  • Thank you for your time spent on this! It looks amazing! I detached thread hoping that I can notify "it's time to stop" and on the next spin of the loop the thread would gracefully exit. So plan was that once ~HandlerThread() is called, thread will exit "as soon as possible", but no rush. Destructor caller will be far far ahead of that, maybe creating a new HandlerThread and start a new detached thread. Kind of swapping connection in runtime. I know my sample code didn't really enclose this intent though. – Latawiec Mar 16 '23 at 15:54
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/252556/discussion-between-latawiec-and-sehe). – Latawiec Mar 16 '23 at 22:06