1

I'm trying to build a WebSocket Server using the C++ Boost implementation of Websockets and used this code as a template for my project: https://www.boost.org/doc/libs/1_80_0/libs/beast/example/websocket/server/async/websocket_server_async.cpp

The template echoes any message the server receives back to the client. I adapted the code in a way that it sends a test message to every client connected to the server. In order to achieve this, I implemented a list of sessions and added a pointer to this list to the listener and the session itself. You will find the adaptions below.

For the broadcast itself I'm basically reusing the example's echo code combined with the list of sessions. The code executed is this:

void send_message(std::string message) {
    buffer_.consume(buffer_.size());
    auto buffer_data = buffer_.prepare(message.size());
    std::copy(message.begin(), message.end(), boost::asio::buffer_cast<char*>(buffer_data));
    buffer_.commit(message.size());

    ws_.async_write(
        buffer_.data(),
        beast::bind_front_handler(
            &session::on_write,
            shared_from_this()));
}

void broadcast_message(std::string message) {
    std::cout << "sessions_ size: " << sessions_->size() << std::endl;

    for (const auto& session_ptr : *sessions_) {
        session_ptr->send_message(message);
    }
}

void on_read(beast::error_code ec, std::size_t bytes_transferred) {
    boost::ignore_unused(bytes_transferred);

    if(ec == websocket::error::closed) return;

    if(ec) return fail(ec, "read");

    for (const auto& session_ptr : *sessions_) {
        session_ptr->send_message("test");
    }
}

This all works fine with a single client connected to the server. With multiple clients however the broadcast gets executed, all messages arrive as intended but as soon as the on_read() code block finishes this assert in the soft_mutex.hpp goes off:

template<class T>
bool
try_lock(T const*)
{
    // If this assert goes off it means you are attempting to
    // simultaneously initiate more than one of same asynchronous
    // operation, which is not allowed. For example, you must wait
    // for an async_read to complete before performing another
    // async_read.
    //
    BOOST_ASSERT(id_ != T::id);
    if(id_ != 0)
        return false;
    id_ = T::id;
    return true;
}

I don't understand why it does. All my async_write operations finish successfully as far as I can tell and I started the sever with one thread. Any help would be greatly appreciated!

This is the adaption to the main method I made to main to pass the sessions list:

std::list<std::shared_ptr<session>> sessions;
std::make_shared<listener>(ioc, tcp::endpoint{address, port}, &sessions)->run();

The sessions are put into the list like this:

auto new_session = std::make_shared<session>(std::move(socket), sessions_);
sessions_->push_back(new_session);
new_session->run();

This is the error message:

/usr/include/boost/beast/websocket/detail/soft_mutex.hpp:89: bool boost::beast::websocket::detail::soft_mutex::try_lock(const T*) [with T = boost::beast::websocket::stream<boost::beast::basic_stream<boost::asio::ip::tcp, boost::asio::execution::any_executor<boost::asio::execution::context_as_t<boost::asio::execution_context&>, boost::asio::execution::detail::blocking::never_t<0>, boost::asio::execution::prefer_only<boost::asio::execution::detail::blocking::possibly_t<0> >, boost::asio::execution::prefer_only<boost::asio::execution::detail::outstanding_work::tracked_t<0> >, boost::asio::execution::prefer_only<boost::asio::execution::detail::outstanding_work::untracked_t<0> >, boost::asio::execution::prefer_only<boost::asio::execution::detail::relationship::fork_t<0> >, boost::asio::execution::prefer_only<boost::asio::execution::detail::relationship::continuation_t<0> > >, boost::beast::unlimited_rate_policy> >::read_some_op<boost::beast::websocket::stream<boost::beast::basic_stream<boost::asio::ip::tcp, boost::asio::execution::any_executor<boost::asio::execution::context_as_t<boost::asio::execution_context&>, boost::asio::execution::detail::blocking::never_t<0>, boost::asio::execution::prefer_only<boost::asio::execution::detail::blocking::possibly_t<0> >, boost::asio::execution::prefer_only<boost::asio::execution::detail::outstanding_work::tracked_t<0> >, boost::asio::execution::prefer_only<boost::asio::execution::detail::outstanding_work::untracked_t<0> >, boost::asio::execution::prefer_only<boost::asio::execution::detail::relationship::fork_t<0> >, boost::asio::execution::prefer_only<boost::asio::execution::detail::relationship::continuation_t<0> > >, boost::beast::unlimited_rate_policy> >::read_op<boost::beast::detail::bind_front_wrapper<void (session::*)(boost::system::error_code, long unsigned int), std::shared_ptr<session> >, boost::beast::basic_flat_buffer<std::allocator<char> > >, boost::asio::mutable_buffer>]: Assertion `id_ != T::id' failed.
sehe
  • 374,641
  • 47
  • 450
  • 633
Mir4culix
  • 81
  • 7

1 Answers1

0

on_accept starts a read-chain:

enter image description here

When you add your send_message, it also starts a read_chain for any other session, which all already run their own:

enter image description here

Fixing?

Note that you're also not paying attention to strands. send_message should be posting the write operations to the sessions strand.

The way to keep a single read and single write chain is to have an outbound queue. I have many examples on this site, where I usually call that queue the outbox (_outbox, m_outbox, outbox_). I recently applied to a websocket application as well: Boost Beast Async Websocket Server How to interface with session?

Note how in that example I also show a session-list with better encapsulation: it belongs to the listener, and should contain weak pointers to avoid lifetime/shutdown deadlock issues.

Of course, your application seems more like a chat server - with omni-directional broadcasting of messages - so perhaps look at how https://www.boost.org/doc/libs/1_81_0/libs/beast/example/websocket/server/chat-multi/ does it. There's a live demo of it as well:


sehe
  • 374,641
  • 47
  • 450
  • 633