3

I was digging through the Asio documention for sockets but I couldn't find anything useful on how I can handle the following situation:

I assume to have a lot of servers in a peer to peer network (up to 1000). Servers will have to communicate regularly with each other so I do not want to open a new client connection to send a message to another server every time this is needed (huge overhead).

At the same time, creating n threads that each correspond to a client -> server connection is also not really viable.

I'll implement different communication schemes (all-to-all, star and tree) so 1, log(n) and n of the servers will have to instantiate those n socket clients to create a connection to the other servers.

Is there a good way I can simply do (pseudocode).

pool = ConnectionPool.create(vector<IP>);
pool.sendMessage(ip, message);

I know on the server side I can use an async connection. However, I don't really know how to handle it from the "client" (sender) perspective in C++/Asio.

Tl:DR;

Which APIs and classes am I supposed to use when I want to "send" messages to N servers without having to open N connections every time I do that and neither using N threads".

raycons
  • 735
  • 12
  • 26
  • 1
    I don't understand your question, i.e. you are mixing much stuff up. Using the async reactor pattern or n threads is concurrency and has nothing to do with peer to peer directly. Of course your pseudo code could work since sendMessage would simply be `for each connection send message. Also boost asio does not care when ever it is p2p or not, you would just have both server and client logic in your program. – Superlokkus Jun 12 '20 at 12:44
  • Yes, each process will need a server side (to receive messages from any of the n participants) and one client side (to send messages to any of the n participants). However, as far as I could find in Asio, the only way to send messages to k of the n participants is by creating k threads with k connections. – raycons Jun 12 '20 at 13:31
  • No not if you stick to the non explicitly sychronized marked API of ASIO. You could even use 1 thread. ASIO as for ASchronous Inout Output, has its main purpose to offer an async, or reactor pattern for IO. That means that you don't have to use k or n thread since, there is no blocking API call. Use of more than 1 thread could help to not got to much work queued up, but thats CPU/system bound. – Superlokkus Jun 12 '20 at 14:03
  • That is exactly the question of the post. Which APIs and classes am I supposed to use when I want to "send" messages to N servers without having to open N connections every time I do that and neither using N threads". – raycons Jun 12 '20 at 14:11
  • Are we talking UDP or TCP? – Superlokkus Jun 12 '20 at 14:14
  • I want to use TCP. – raycons Jun 12 '20 at 14:16

2 Answers2

4

Yes, each process will need a server side (to receive messages from any of the n participants) and one client side (to send messages to any of the n participants). However, as far as I could find in Asio, the only way to send messages to k of the n participants is by creating k threads with k connections

Then you must not have looked in the right place, or not very far at all.

A core tenet async IO is multiplexing IO on a single thread (all of the kqueue/epoll/select/IO completion ports etc abstractions are geared towards that goal).

Here's an absolutely lazy-coded demonstration that shows:

  • single threaded everything
  • a listener that accepts unbounded clients (we could easily add additional listeners)
  • we connect to a collection of "peers"
  • on a heartbeat interval we send all the peers a heartbeat message

        for (auto& peer : peers)
            async_write(peer, buffer(message), [ep=peer.remote_endpoint(ec)](error_code ec, size_t xfr) {
                std::cout << "(sent " << xfr << " bytes to " << ep << "(" << ec.message() << ")" << std::endl;
            });
    
  • additionally it handles asynchronous process signals (INT, TERM) to shutdown all the async operations

"Live¹" On Coliru

#include <boost/asio.hpp>
#include <list>
#include <iostream>
using std::tuple;
using namespace std::literals;

template <typename T>
static auto reference_eq(T const& obj) {
    return [p=&obj](auto& ref) { return &ref == p; };
}

int main() {
    using namespace boost::asio; // don't be this lazy please
    using boost::system::error_code;
    using ip::tcp;

    io_context ioc;
    tcp::acceptor listener(ioc, {{}, 6868});
    listener.set_option(tcp::acceptor::reuse_address(true));
    listener.listen();

    using Loop = std::function<void()>;

    std::list<tcp::socket> clients, peers;

    // accept unbounded clients
    Loop accept_loop = [&] {
        listener.async_accept([&](error_code const& ec, tcp::socket s) {
            if (!ec) {
                std::cout << "New session " << s.remote_endpoint() << std::endl;
                clients.push_back(std::move(s));
                accept_loop();
            }
        });
    };

    tcp::resolver resoler(ioc);
    for (auto [host,service] : {
                tuple{"www.example.com", "http"}, 
                {"localhost", "6868"}, 
                {"::1", "6868"}, 
                // ...
            })
    {
        auto& p = peers.emplace_back(ioc);
        async_connect(p, resoler.resolve(host,service), [&,spec=(host+":"s+service)](error_code ec, auto...) {
            std::cout << "For " << spec << " (" << ec.message() << ")";
            if (!ec)
                std::cout << " " << p.remote_endpoint();
            else
                peers.remove_if(reference_eq(p));
            std::cout << std::endl;
        });
    }

    std::string const& message = "heartbeat\n";
    high_resolution_timer timer(ioc);
    Loop heartbeat = [&]() mutable {
        timer.expires_from_now(2s);
        timer.async_wait([&](error_code ec) {
            std::cout << "heartbeat " << ec.message() << std::endl;
            if (ec)
                return;
            for (auto& peer : peers)
                async_write(peer, buffer(message), [ep=peer.remote_endpoint(ec)](error_code ec, size_t xfr) {
                    std::cout << "(sent " << xfr << " bytes to " << ep << "(" << ec.message() << ")" << std::endl;
                });
            heartbeat();
        });
    };

    signal_set sigs(ioc, SIGINT, SIGTERM);
    sigs.async_wait([&](error_code ec, int sig) {
        if (!ec) {
            std::cout << "signal: " << strsignal(sig) << std::endl;
            listener.cancel();
            timer.cancel();
        } });

    accept_loop();
    heartbeat();

    ioc.run_for(10s); // max time for Coliru, or just `run()`
}

Prints (on my system):

New session 127.0.0.1:46730
For localhost:6868 (Success) 127.0.0.1:6868
For ::1:6868 (Connection refused)
For www.example.com:http (Success) 93.184.216.34:80
heartbeat Success
(sent 10 bytes to 93.184.216.34:80(Success)
(sent 10 bytes to 127.0.0.1:6868(Success)
heartbeat Success
(sent 10 bytes to 93.184.216.34:80(Success)
(sent 10 bytes to 127.0.0.1:6868(Success)
heartbeat Success
(sent 10 bytes to 93.184.216.34:80(Success)
(sent 10 bytes to 127.0.0.1:6868(Success)
^Csignal: Interrupt
heartbeat Operation canceled

Note how the one client ("New session") is our own peer connection on localhost:6868 :)

Of course, in real life you would have a class to represent a client session, perhaps have queues for messages pending sending, and optionally run on multiple threads (using strands to synchronize access to shared objects).

OTHER SAMPLES

If you really wish to avoid an explicit collection of clients, see this very similar demo: How to pass a boost asio tcp socket to a thread for sending heartbeat to client or server which

  • also starts from single-threaded, but adds a thread pool for strand demonstration purposes)
  • It has a heartbeat timer per session meaning that each session can have their own frequency

¹ it's not working on coliru because of limited access to network. A loop-back only version without resolver use works: Live On Coliru

sehe
  • 374,641
  • 47
  • 450
  • 633
  • Okay, so basically, we create a method that receives incoming connections and pushes them into a list so the connection is not closed. Then we create a connection to each peer (each other server) and also store that so we can heartbeat those. After that I just gotta take out one of the peer objects and async_write to it, to handle that. I think this is exactly what I was looking for ty. I was mainly looking in the official Asio resources. Are there other good related resources you can point me to? – raycons Jun 12 '20 at 16:04
  • That's what I chose to do here, because life times are simple and single-threaded makes it cheap to have a centralized collection. – sehe Jun 12 '20 at 16:09
  • 1
    I'm sad to say I have 3 or 4 books on Asio and none stands out as a particular good learning book (most show many anti-patterns and some even outright bugs). I learn from [the people on SO](https://stackoverflow.com/tags/boost-asio/topusers), there's https://cpplang.now.sh/ – sehe Jun 12 '20 at 16:14
  • auto& p = peers.emplace_back(ioc); Seems not to work for me, (I'm using c++ 14) in that case emplace_back has a void return. – raycons Jun 12 '20 at 16:43
  • That's older standard libraries (see https://en.cppreference.com/w/cpp/container/list/emplace_back, "since c++17"). You can use something like `l.emplace_back(...); auto& p = l.back();` – sehe Jun 12 '20 at 17:43
  • Wow what a nice elaborated answer, I have no choice but to upvote yours. – Superlokkus Jun 14 '20 at 18:01
1

Since you stated you want to use a TCP i.e. connection based protocol, you can use the async ASIO API and could rely on 1 thread, because async i.e. reactor pattern call do not block.

Your server would use boost::asio::async_write to a boost::asio::ip::tcp::socket, which is equal to one TCP connection happening. The callback you give async_write as a parameter will be called when you are done sending, but async_write would return immediatly. Receiving would be similar to a client. In order to get a TCP connection to a incoming client you would have to use a boost::asio::ip::tcp::resolver which opens new TCP connections/sockets for you by listening via boost::asio::ip::tcp::resolver::async_resolve in the client and boost::asio::ip::tcp::acceptor initialized with a boost::asio::ip::tcp::endpoint and boost::asio::ip::tcp::acceptor::async_accept on server side. Actually you would need 2, one for IPv4 and for IPv6 each.

Since you would have some state with a TCP connection on server side, you would ordinary have to track in a central place, but to avoid this contention and ease the pattern, its common to use a class which inherits std::enable_shared_from_this, which will give a std::shared_pointer of itself into the callback to std::async_write so that, between sending and receiving, where the thread is not blocked in the usual sense, it would not be forgotten i.e. deleted.

For reading I recommend boost::asio::async_read_until and in general a boost::asio::streambuf.

By this 1 thread that runs boost::asio::io_context::run in a loop would suffice, it would unblock every-time one of the many connections need processing of the received stuff or something new to be sent has to be generated.

The general project is a bit out of scope, it would help if you could narrow your question a bit, or better read the talks and examples. I have written something similiar as you indent, a resilient overlay network: https://github.com/Superlokkus/code

Superlokkus
  • 4,731
  • 1
  • 25
  • 57
  • But do I have to create N objects of the type boost::asio::ip::tcp::socket and put them in a vector and then call them when I need them? – raycons Jun 12 '20 at 15:04
  • No that's thanks to the `shared_from_this` pattern, you would give `boost::asio::ip::tcp::acceptor::async_accept::async_accept` a callback that creates a new instance of your class that represents a connection. It does not have to put that instance into anything, because your connection class will start the first async receive via `boost::asio::async_read_until`, and that callback object, a lambda for example with a `[me=shared_from_this()] capture, will hold a `std::shared_ptr` instance until you got data or the connection is closed. You would have no central connection dictionary. – Superlokkus Jun 12 '20 at 15:13
  • Of course if these connections manipulate some global state, your would have to give them a reference at creation, which you have to synchronize on if you have more than 1 thread running `io_context::run` via a `boost::asio::io_context::strand`, `std::atomics` or a mutex – Superlokkus Jun 12 '20 at 15:16
  • Basically you have to think of the whole thing as user land multi threading with cooperative scheduling, which fits perfectly in the async IO APIs native in most operating systems, which otherwise would have to block threads instead of just transitively post a userland task i.e. callback invocation on a queue, from the interrupt caused by your network adapter/card. – Superlokkus Jun 12 '20 at 15:18
  • 1
    See my answer for a demo. If you really wish to avoid an explicit collection of clients, see [this very similar demo](https://stackoverflow.com/a/62090362/85371) which also starts from single-threaded, but adds a threadpool for strand demonstration purposes). It has a heartbeat timer per session meaning that each session can have their own frequency. This may actually be what you wanted, or maybe not. – sehe Jun 12 '20 at 15:46