6

I'm working on a project that involves a boost::beast websocket/http mixed server, which runs on top of boost::asio. I've heavily based my project off the advanced_server.cpp example source.

It works fine, but right now I'm attempting to add a feature that requires the sending of a message to all connected clients.

I'm not very familiar with boost::asio, but right now I can't see any way to have something like "broadcast" events (if that's even the correct term).

My naive approach would be to see if I can have the construction of websocket_session() attach something like an event listener, and the destructor detatch the listener. At that point, I could just fire the event, and have all the currently valid websocket sessions (to which the lifetime of websocket_session() is scoped) execute a callback.

There is https://stackoverflow.com/a/17029022/268006, which does more or less what I want by (ab)using a boost::asio::steady_timer, but that seems like a kind of horrible hack to accomplish something that should be pretty straightforward.

Basically, given a stateful boost::asio server, how can I do an operation on multiple connections?

Fake Name
  • 5,556
  • 5
  • 44
  • 66

3 Answers3

15

First off: You can broadcast UDP, but that's not to connected clients. That's just... UDP.

Secondly, that link shows how to have a condition-variable (event)-like interface in Asio. That's only a tiny part of your problem. You forgot about the big picture: you need to know about the set of open connections, one way or the other:

  1. e.g. keeping a container of session pointers (weak_ptr) to each connection
  2. each connection subscribing to a signal slot (e.g. Boost Signals).

Option 1. is great for performance, option 2. is better for flexibility (decoupling the event source from subscribers, making it possible to have heterogenous subscribers, e.g. not from connections).

Because I think Option 1. is much simpler w.r.t to threading, better w.r.t. efficiency (you can e.g. serve all clients from one buffer without copying) and you probably don't need to doubly decouple the signal/slots, let me refer to an answer where I already showed as much for pure Asio (without Beast):

It shows the concept of a "connection pool" - which is essentially a thread-safe container of weak_ptr<connection> objects with some garbage collection logic.

Demonstration: Introducing Echo Server

After chatting about things I wanted to take the time to actually demonstrate the two approaches, so it's completely clear what I'm talking about.

First let's present a simple, run-of-the mill asynchronous TCP server with

  • with multiple concurrent connections
  • each connected session reads from the client line-by-line, and echoes the same back to the client
  • stops accepting after 3 seconds, and exits after the last client disconnects

master branch on github

#include <boost/asio.hpp>
#include <memory>
#include <list>
#include <iostream>

namespace ba = boost::asio;
using ba::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::string_literals;

static bool s_verbose = false;

struct connection : std::enable_shared_from_this<connection> {
    connection(ba::io_context& ioc) : _s(ioc) {}

    void start() { read_loop(); }
    void send(std::string msg, bool at_front = false) {
        post(_s.get_io_service(), [=] { // _s.get_executor() for newest Asio
            if (enqueue(std::move(msg), at_front))
                write_loop();
        });
    }

  private:
    void do_echo() {
        std::string line;
        if (getline(std::istream(&_rx), line)) {
            send(std::move(line) + '\n');
        }
    }

    bool enqueue(std::string msg, bool at_front)
    { // returns true if need to start write loop
        at_front &= !_tx.empty(); // no difference
        if (at_front)
            _tx.insert(std::next(begin(_tx)), std::move(msg));
        else
            _tx.push_back(std::move(msg));

        return (_tx.size() == 1);
    }
    bool dequeue()
    { // returns true if more messages pending after dequeue
        assert(!_tx.empty());
        _tx.pop_front();
        return !_tx.empty();
    }

    void write_loop() {
        ba::async_write(_s, ba::buffer(_tx.front()), [this,self=shared_from_this()](error_code ec, size_t n) {
                if (s_verbose) std::cout << "Tx: " << n << " bytes (" << ec.message() << ")" << std::endl;
                if (!ec && dequeue()) write_loop();
            });
    }

    void read_loop() {
        ba::async_read_until(_s, _rx, "\n", [this,self=shared_from_this()](error_code ec, size_t n) {
                if (s_verbose) std::cout << "Rx: " << n << " bytes (" << ec.message() << ")" << std::endl;
                do_echo();
                if (!ec)
                    read_loop();
            });
    }

    friend struct server;
    ba::streambuf          _rx;
    std::list<std::string> _tx;
    tcp::socket            _s;
};

struct server {
    server(ba::io_context& ioc) : _ioc(ioc) {
        _acc.bind({{}, 6767});
        _acc.set_option(tcp::acceptor::reuse_address());
        _acc.listen();
        accept_loop();
    }

    void stop() {
        _ioc.post([=] {
                _acc.cancel();
                _acc.close();
            });
    }

  private:
    void accept_loop() {
        auto session = std::make_shared<connection>(_acc.get_io_context());
        _acc.async_accept(session->_s, [this,session](error_code ec) {
             auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
             std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;

             session->start();
             if (!ec)
                 accept_loop();
        });
    }

    ba::io_context& _ioc;
    tcp::acceptor _acc{_ioc, tcp::v4()};
};

int main(int argc, char** argv) {
    s_verbose = argc>1 && argv[1] == "-v"s;

    ba::io_context ioc;

    server s(ioc);

    std::thread th([&ioc] { ioc.run(); }); // todo exception handling

    std::this_thread::sleep_for(3s);
    s.stop(); // active connections will continue

    th.join();
}

Approach 1. Adding Broadcast Messages

So, let's add "broadcast messages" that get sent to all active connections simultaneously. We add two:

  • one at each new connection (saying "Player ## has entered the game")
  • one that emulates a global "server event", like you described in the question). It gets triggered from within main:

    std::this_thread::sleep_for(1s);
    
    auto n = s.broadcast("random global event broadcast\n");
    std::cout << "Global event broadcast reached " << n << " active connections\n";
    

Note how we do this by registering a weak pointer to each accepted connection and operating on each:

    _acc.async_accept(session->_s, [this,session](error_code ec) {
         auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
         std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;

         if (!ec) {
             auto n = reg_connection(session);

             session->start();
             accept_loop();

             broadcast("player #" + std::to_string(n) + " has entered the game\n");
         }

    });

broadcast is also used directly from main and is simply:

size_t broadcast(std::string const& msg) {
    return for_each_active([msg](connection& c) { c.send(msg, true); });
}

using-asio-post branch on github

#include <boost/asio.hpp>
#include <memory>
#include <list>
#include <iostream>

namespace ba = boost::asio;
using ba::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::string_literals;

static bool s_verbose = false;

struct connection : std::enable_shared_from_this<connection> {
    connection(ba::io_context& ioc) : _s(ioc) {}

    void start() { read_loop(); }
    void send(std::string msg, bool at_front = false) {
        post(_s.get_io_service(), [=] { // _s.get_executor() for newest Asio
            if (enqueue(std::move(msg), at_front))
                write_loop();
        });
    }

  private:
    void do_echo() {
        std::string line;
        if (getline(std::istream(&_rx), line)) {
            send(std::move(line) + '\n');
        }
    }

    bool enqueue(std::string msg, bool at_front)
    { // returns true if need to start write loop
        at_front &= !_tx.empty(); // no difference
        if (at_front)
            _tx.insert(std::next(begin(_tx)), std::move(msg));
        else
            _tx.push_back(std::move(msg));

        return (_tx.size() == 1);
    }
    bool dequeue()
    { // returns true if more messages pending after dequeue
        assert(!_tx.empty());
        _tx.pop_front();
        return !_tx.empty();
    }

    void write_loop() {
        ba::async_write(_s, ba::buffer(_tx.front()), [this,self=shared_from_this()](error_code ec, size_t n) {
                if (s_verbose) std::cout << "Tx: " << n << " bytes (" << ec.message() << ")" << std::endl;
                if (!ec && dequeue()) write_loop();
            });
    }

    void read_loop() {
        ba::async_read_until(_s, _rx, "\n", [this,self=shared_from_this()](error_code ec, size_t n) {
                if (s_verbose) std::cout << "Rx: " << n << " bytes (" << ec.message() << ")" << std::endl;
                do_echo();
                if (!ec)
                    read_loop();
            });
    }

    friend struct server;
    ba::streambuf          _rx;
    std::list<std::string> _tx;
    tcp::socket            _s;
};

struct server {
    server(ba::io_context& ioc) : _ioc(ioc) {
        _acc.bind({{}, 6767});
        _acc.set_option(tcp::acceptor::reuse_address());
        _acc.listen();
        accept_loop();
    }

    void stop() {
        _ioc.post([=] {
                _acc.cancel();
                _acc.close();
            });
    }

    size_t broadcast(std::string const& msg) {
        return for_each_active([msg](connection& c) { c.send(msg, true); });
    }

  private:
    using connptr = std::shared_ptr<connection>;
    using weakptr = std::weak_ptr<connection>;

    std::mutex _mx;
    std::vector<weakptr> _registered;

    size_t reg_connection(weakptr wp) {
        std::lock_guard<std::mutex> lk(_mx);
        _registered.push_back(wp);
        return _registered.size();
    }

    template <typename F>
    size_t for_each_active(F f) {
        std::vector<connptr> active;
        {
            std::lock_guard<std::mutex> lk(_mx);
            for (auto& w : _registered)
                if (auto c = w.lock())
                    active.push_back(c);
        }

        for (auto& c : active) {
            std::cout << "(running action for " << c->_s.remote_endpoint() << ")" << std::endl;
            f(*c);
        }

        return active.size();
    }

    void accept_loop() {
        auto session = std::make_shared<connection>(_acc.get_io_context());
        _acc.async_accept(session->_s, [this,session](error_code ec) {
             auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
             std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;

             if (!ec) {
                 auto n = reg_connection(session);

                 session->start();
                 accept_loop();

                 broadcast("player #" + std::to_string(n) + " has entered the game\n");
             }

        });
    }

    ba::io_context& _ioc;
    tcp::acceptor _acc{_ioc, tcp::v4()};
};

int main(int argc, char** argv) {
    s_verbose = argc>1 && argv[1] == "-v"s;

    ba::io_context ioc;

    server s(ioc);

    std::thread th([&ioc] { ioc.run(); }); // todo exception handling

    std::this_thread::sleep_for(1s);

    auto n = s.broadcast("random global event broadcast\n");
    std::cout << "Global event broadcast reached " << n << " active connections\n";

    std::this_thread::sleep_for(2s);
    s.stop(); // active connections will continue

    th.join();
}

Approach 2: Those Broadcast But With Boost Signals2

The Signals approach is a fine example of Dependency Inversion.

Most salient notes:

  • signal slots get invoked on the thread invoking it ("raising the event")
  • the scoped_connection is there so subscriptions are *automatically removed when the connection is destructed
  • there's subtle difference in the wording of the console message from "reached # active connections" to "reached # active subscribers".

The difference is key to understanding the added flexibility: the signal owner/invoker does not know anything about the subscribers. That's the decoupling/dependency inversion we're talking about

using-signals2 branch on github

#include <boost/asio.hpp>
#include <memory>
#include <list>
#include <iostream>
#include <boost/signals2.hpp>

namespace ba = boost::asio;
using ba::ip::tcp;
using boost::system::error_code;
using namespace std::chrono_literals;
using namespace std::string_literals;

static bool s_verbose = false;

struct connection : std::enable_shared_from_this<connection> {
    connection(ba::io_context& ioc) : _s(ioc) {}

    void start() { read_loop(); }
    void send(std::string msg, bool at_front = false) {
        post(_s.get_io_service(), [=] { // _s.get_executor() for newest Asio
            if (enqueue(std::move(msg), at_front))
                write_loop();
        });
    }

  private:
    void do_echo() {
        std::string line;
        if (getline(std::istream(&_rx), line)) {
            send(std::move(line) + '\n');
        }
    }

    bool enqueue(std::string msg, bool at_front)
    { // returns true if need to start write loop
        at_front &= !_tx.empty(); // no difference
        if (at_front)
            _tx.insert(std::next(begin(_tx)), std::move(msg));
        else
            _tx.push_back(std::move(msg));

        return (_tx.size() == 1);
    }
    bool dequeue()
    { // returns true if more messages pending after dequeue
        assert(!_tx.empty());
        _tx.pop_front();
        return !_tx.empty();
    }

    void write_loop() {
        ba::async_write(_s, ba::buffer(_tx.front()), [this,self=shared_from_this()](error_code ec, size_t n) {
                if (s_verbose) std::cout << "Tx: " << n << " bytes (" << ec.message() << ")" << std::endl;
                if (!ec && dequeue()) write_loop();
            });
    }

    void read_loop() {
        ba::async_read_until(_s, _rx, "\n", [this,self=shared_from_this()](error_code ec, size_t n) {
                if (s_verbose) std::cout << "Rx: " << n << " bytes (" << ec.message() << ")" << std::endl;
                do_echo();
                if (!ec)
                    read_loop();
            });
    }

    friend struct server;
    ba::streambuf          _rx;
    std::list<std::string> _tx;
    tcp::socket            _s;

    boost::signals2::scoped_connection _subscription;
};

struct server {
    server(ba::io_context& ioc) : _ioc(ioc) {
        _acc.bind({{}, 6767});
        _acc.set_option(tcp::acceptor::reuse_address());
        _acc.listen();
        accept_loop();
    }

    void stop() {
        _ioc.post([=] {
                _acc.cancel();
                _acc.close();
            });
    }

    size_t broadcast(std::string const& msg) {
        _broadcast_event(msg);
        return _broadcast_event.num_slots();
    }

  private:
    boost::signals2::signal<void(std::string const& msg)> _broadcast_event;

    size_t reg_connection(connection& c) {
        c._subscription = _broadcast_event.connect(
                [&c](std::string msg){ c.send(msg, true); }
            );

        return _broadcast_event.num_slots();
    }

    void accept_loop() {
        auto session = std::make_shared<connection>(_acc.get_io_context());
        _acc.async_accept(session->_s, [this,session](error_code ec) {
             auto ep = ec? tcp::endpoint{} : session->_s.remote_endpoint();
             std::cout << "Accept from " << ep << " (" << ec.message() << ")" << std::endl;

             if (!ec) {
                 auto n = reg_connection(*session);

                 session->start();
                 accept_loop();

                 broadcast("player #" + std::to_string(n) + " has entered the game\n");
             }

        });
    }

    ba::io_context& _ioc;
    tcp::acceptor _acc{_ioc, tcp::v4()};
};

int main(int argc, char** argv) {
    s_verbose = argc>1 && argv[1] == "-v"s;

    ba::io_context ioc;

    server s(ioc);

    std::thread th([&ioc] { ioc.run(); }); // todo exception handling

    std::this_thread::sleep_for(1s);

    auto n = s.broadcast("random global event broadcast\n");
    std::cout << "Global event broadcast reached " << n << " active subscribers\n";

    std::this_thread::sleep_for(2s);
    s.stop(); // active connections will continue

    th.join();
}

See the diff between Approach 1. and 2.: Compare View on github

A sample of the output when run against 3 concurrent clients with:

(for a in {1..3}; do netcat localhost 6767 < /etc/dictionaries-common/words > echoed.$a& sleep .1; done; time wait)

enter image description here

sehe
  • 374,641
  • 47
  • 450
  • 633
  • > `You forgot about the big picture: you need to know about the set of open connections, one way or the other:` I did not. I already have some state that's per-connection, I can attach to the event source on it's construction, and detach on it's destruction. That ties the connection state to the listener, and re-uses the fact that the asio loop must already know about all the clients for this as well. – Fake Name Mar 21 '18 at 19:39
  • Well, that's "option 2." then. I merely said that because it really doesn't make much sense to use the "emulated condition variables" that you [link](https://stackoverflow.com/a/17029022/268006) to. In fact you said this about it: _"There is https://stackoverflow.com/a/17029022/268006, which does more or less what I want"_ - That's why I said you forgot about the big picture. [Just because in a threaded concurrency world you'd synchronize using a condition variable, doesn't mean that emulating that in Asio "does more or less what you want".] – sehe Mar 21 '18 at 19:56
  • So back to the answer, am I correct in thinking that you basically say you'd prefer the "option 2." approach [so you don't need explicit connection tracking]? Is there anything else we could clarify? – sehe Mar 21 '18 at 19:56
  • But the point is that *I don't need to synchronize*, I just need to at some point call a function for each listener (with the outgoing buffer being a shared pointer, it avoids the copying too). I do need explicit connection tracking, but pushing that down into the asio system, which is already running makes much more sense to me then implementing it myself. – Fake Name Mar 21 '18 at 20:38
  • My understanding of how https://stackoverflow.com/a/17029022/268006 works is that basically, when you `cancel()` on the timer, the asio loop just drops calls for each of your registered callbacks into the asio event queue, and then processes them as it can. I can't see how that's structurally different from the manually-track-connections-and-hand-queue-callback-invocations that you have as option 1. – Fake Name Mar 21 '18 at 20:44
  • Re "But the point is that _I don't need to synchronize_" - exactly. This is why you don't need the timer trick. Of course you **can** use it, but it' seems a complication. No need to "push it down into the asio system" - just write the code as you would without Asio. The two are _structurally_ different in the fact that there's [Inversion Of Control](https://en.wikipedia.org/wiki/Inversion_of_control). If you're thinking so highlevel that this counts as "no structural difference", then I agree: any implementation with the the same observable behaviour can be said "structurally the same". – sehe Mar 21 '18 at 21:29
  • "just write the code as you would without Asio" But the whole point here is I *do* have asio, so why would you re-implement stuff that's already done? – Fake Name Mar 21 '18 at 21:31
  • @FakeName the assumption is that you are going to duplicate something. Clearly, Asio doesn't have it. So, that point is moot. Coming back to the answer, I find it amusing that I've just reiterated the clarification of the _structural_ differences between option 1. and option 2. that was in my answer from the first time posting. I'm not sure why you're going in circles. Are you just trying to say you'd prefer to have an example of option 2. rather than option 1.? – sehe Mar 21 '18 at 21:34
  • I think the issue here is I disagree about your categorization of the timer trick (it's has the overhead and efficiency of option 1). Asio *does* have what I need, it's just kind of a hack, and the only reason I can see it's not more fully defined by boost::asio is that they just didn't consider this particular use-case. – Fake Name Mar 21 '18 at 21:36
  • That's a valid opinion. Where does that leave us with respect to the question? – sehe Mar 21 '18 at 21:37
  • 1
    I'm not sure. Mostly, I think I'm just mildly irritated at boost. I actually wound up using the timer hack, and it seems to work fine. I was really hoping that there was a boost-native way to have events/subscribers as part of asio (that'd I'd just missed in the documentation, somehow), and apparently there is not. – Fake Name Mar 21 '18 at 21:38
  • Thanks! I guess it's time to mail the boost discussion lists, or similar. – Fake Name Mar 21 '18 at 21:39
  • I looked at it, but boost signal is blocking, as far as I can tell (it's hard to research, because 99% of the google results are about handling ctrl+c). The timer thing is non-blocking and fully integrates with the asio event loop. – Fake Name Mar 21 '18 at 21:41
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/167297/discussion-between-sehe-and-fake-name). – sehe Mar 21 '18 at 21:45
  • I guess I don't see how to use boost.signals without having each connection have a thread that's blocking on the relevant signal? I may be confusing OS signals and boost signals. – Fake Name Mar 21 '18 at 21:47
  • Extensive **UPDATE** to the answer: both approaches fleshed out with demo code on [github](https://github.com/sehe/broadcast_to_sessions) – sehe Mar 23 '18 at 00:03
5

The answer from @sehe was amazing, so I'll be brief. Generally speaking, to implement an algorithm which operates on all active connections you must do the following:

  • Maintain a list of active connections. If this list is accessed by multiple threads, it will need synchronization (std::mutex). New connections should be inserted to the list, and when a connection is destroyed or becomes inactive it should be removed from the list.

  • To iterate the list, synchronization is required if the list is accessed by multiple threads (i.e. more than one thread calling asio::io_context::run, or if the list is also accessed from threads that are not calling asio::io_context::run)

  • During iteration, if the algorithm needs to inspect or modify the state of any connection, and that state can be changed by other threads, additional synchronization is needed. This includes any internal "queue" of messages that the connection object stores.

  • A simple way to synchronize a connection object is to use boost::asio::post to submit a function for execution on the connection object's context, which will be either an explicit strand (boost::asio::strand, as in the advanced server examples) or an implicit strand (what you get when only one thread calls io_context::run). The Approach 1 provided by @sehe uses post to synchronize in this fashion.

  • Another way to synchronize the connection object is to "stop the world." That means call io_context::stop, wait for all the threads to exit, and then you are guaranteed that no other threads are accessing the list of connections. Then you can read and write connection object state all you want. When you are finished with the list of connections, call io_context::restart and launch the threads which call io_context::run again. Stopping the io_context does not stop network activity, the kernel and network drivers still send and receive data from internal buffers. TCP/IP flow control will take care of things so the application still operates smoothly even though it becomes briefly unresponsive during the "stop the world." This approach can simplify things but depending on your particular application you will have to evaluate if it is right for you.

Hope this helps!

Vinnie Falco
  • 5,173
  • 28
  • 43
  • I wound up using a combination of `boost::signal2` and `boost::asio::post`. Some snippets [here](https://chat.stackoverflow.com/transcript/message/41766442#41766442) – Fake Name Mar 24 '18 at 01:45
  • 1
    And yeah, @sehe went ridiculously above and beyond in his answer. – Fake Name Mar 24 '18 at 01:46
0

Thank you @sehe for the amazing answer. Still, I think there is a small but severe bug in the Approach 2. IMHO reg_connection should look like this:

size_t reg_connection(std::shared_ptr<connection> c) {
    c->_subscription = _broadcast_event.connect(
        [weak_c = std::weak_ptr<connection>(c)](std::string msg){ 
            if(auto c = weak_c.lock())
                c->send(msg, true); 
        }
    );
    return _broadcast_event.num_slots();
}

Otherwise you can end up with a race condition leading to a server crash. In case the connection instance is destroyed during the call to the lambda, the reference becomes invalid.

Similarly connection#send() should look like this, because otherwise this might be dead by the time the lambda is called:

    void send(std::string msg, bool at_front = false) {
      post(_s.get_io_service(),
        [self=shared_from_this(), msg=std::move(msg), at_front] { 
          if (self->enqueue(std::move(msg), at_front))
              self->write_loop();
        });
    }

PS: I would have posted this as a comment on @sehe's answer, but unfortunately I have not enough reputation.

Wave
  • 61
  • 6
  • This does not provide an answer to the question. Once you have sufficient [reputation](https://stackoverflow.com/help/whats-reputation) you will be able to [comment on any post](https://stackoverflow.com/help/privileges/comment); instead, [provide answers that don't require clarification from the asker](https://meta.stackexchange.com/questions/214173/why-do-i-need-50-reputation-to-comment-what-can-i-do-instead). - [From Review](/review/late-answers/29840201) – isopach Sep 16 '21 at 14:52