6

I am making a few attempts at making my own simple asynch TCP server using boost::asio after not having touched it for several years.

The latest example listing I can find is: http://www.boost.org/doc/libs/1_54_0/doc/html/boost_asio/tutorial/tutdaytime3/src.html

The problem I have with this example listing is that (I feel) it cheats and it cheats big, by making the tcp_connection a shared_ptr, such that it doesn't worry about the lifetime management of each connection. (I think) They do this for brevity, since it is a small tutorial, but that solution is not real world.

What if you wanted to send a message to each client on a timer, or something similar? A collection of client connections is going to be necessary in any real world non-trivial server.

I am worried about the lifetime management of each connection. I figure the natural thing to do would be to keep some collection of tcp_connection objects or pointers to them inside tcp_server. Adding to that collection from the OnConnect callback and removing from that collection OnDisconnect.

Note that OnDisconnect would most likely be called from an actual Disconnect method, which in turn would be called from OnReceive callback or OnSend callback, in the case of an error.

Well, therein lies the problem.

Consider we'd have a callstack that looked something like this:

tcp_connection::~tcp_connection
tcp_server::OnDisconnect
tcp_connection::OnDisconnect
tcp_connection::Disconnect
tcp_connection::OnReceive

This would cause errors as the call stack unwinds and we are executing code in a object that has had its destructor called...I think, right?

I imagine everyone doing server programming comes across this scenario in some fashion. What is a strategy for handling it?

I hope the explanation is good enough to follow. If not let me know and I will create my own source listing, but it will be very large.


Edit: Related

) Memory management in asynchronous C++ code

IMO not an acceptable answer, relies on cheating with shared_ptr outstanding on receive calls and nothing more, and is not real world. what if the server wanted to say "Hi" to all clients every 5 minutes. A collection of some kind is necessary. What if you are calling io_service.run on multiple threads?

I am also asking on the boost mailing list: http://boost.2283326.n4.nabble.com/How-to-design-proper-release-of-a-boost-asio-socket-or-wrapper-thereof-td4693442.html

Community
  • 1
  • 1
Christopher Pisz
  • 3,757
  • 4
  • 29
  • 65
  • 1
    Gee. Why all the big words. "Cheating". "Cheating big!". "Not real world!". I don't think they use shared_ptr "for brevity" - that's your words only. I use shared_ptr in my own application. And yes, I keep a collection of `weak_ptr` to keep track of them too. What is the problem with that? Are my servers "not real world"? – sehe Apr 05 '17 at 23:02
  • 1
    `shared_ptr` is not cheating and it's very real-world. It exists for a reason, and it's to be used in the real world in situations like this. – Puppy Apr 06 '17 at 07:30
  • Why do you think that having a `shared_ptr` means you have no collection ? Does `std::vector>` seems so unbelievable to you ? – Drax Apr 06 '17 at 08:36
  • 1
    Look fellas, I am not against using shared_ptrs in your applications. I am pointing out that relying on an outstanding receive call rather than managing the lifetime of a connection yourself is cheating. Having a collection of weak_ptrs did not occur to me. – Christopher Pisz Apr 06 '17 at 14:14
  • @drax Yes std::vector> seems absolutely unviable to me. If you keep an outstanding reference to the connection in your collection, it never gets destroyed. If you rely on removing it from the collection to destroy it, you've just moved the problem rather than solve it. It then becomes "When can I safely remove the ptr from my collection in order to destroy it" rather than "When can I destroy it", you cannot safely remove it while in a async callback and have the stack unwind. – Christopher Pisz Apr 06 '17 at 14:31
  • @ChristopherPisz You can safely remove the shared_ptr at any time you notice you don't need the connection anymore, it will not directly destroy it at taht instant if it is used anywhere else for the very reason that you are using shared_ptrs and the connection won't get destroyed until all actions requiring it were posted if they correctly copied the connection pointer. The probleme of "when to remove the connection" exist no matter how you store it. – Drax Apr 07 '17 at 12:45
  • @Drax now that I think about it, I might see your point. Maybe weakptr isn't even necessary either. In that case, the original answer on the other post is correct. What to do with this question then, since we probably don't want to lose the extra information we've gathered. – Christopher Pisz Apr 07 '17 at 15:51
  • 1
    @ChristopherPisz Keep it up, it's useful for people who might wonder about the same problem :) – Drax Apr 10 '17 at 09:05

4 Answers4

4

Like I said, I fail to see how using smart pointers is "cheating, and cheating big". I also do not think your assessment that "they do this for brevity" holds water.


Here's a slightly redacted excerpt¹ from our code base that exemplifies how using shared_ptrs doesn't preclude tracking connections.

It shows just the server side of things, with

  • a very simple connection object in connection.hpp; this uses the enable_shared_from_this

  • just the fixed size connection_pool (we have dynamically resizing pools too, hence the locking primitives). Note how we can do actions on all active connections.

    So you'd trivially write something like this to write to all clients, like on a timer:

    _pool.for_each_active([] (auto const& conn) {
        send_message(conn, hello_world_packet);
    });
    
  • a sample listener that shows how it ties in with the connection_pool (which has a sample method to close all connections)

Code Listings

  • connection.hpp

    #pragma once
    
    #include "xxx/net/rpc/protocol.hpp"
    #include "log.hpp"
    #include "stats_filer.hpp"
    #include <memory>
    
    namespace xxx { namespace net { namespace rpc {
    
        struct connection : std::enable_shared_from_this<connection>, protected LogSource {
            typedef std::shared_ptr<connection> ptr;
    
          private:
            friend struct io;
            friend struct listener;
    
            boost::asio::io_service& _svc;
            protocol::socket _socket;
            protocol::endpoint _ep;
            protocol::endpoint _peer;
          public:
    
            connection(boost::asio::io_service& svc, protocol::endpoint ep)
                : LogSource("rpc::connection"),
                  _svc(svc),
                  _socket(svc),
                  _ep(ep)
            {}
    
            void init() {
                _socket.set_option(protocol::no_delay(true));
                _peer = _socket.remote_endpoint();
                g_stats_filer_p->inc_value("asio." + _ep.address().to_string() + ".sockets_accepted");
                debug() << "New connection from " << _peer;
            }
    
            protocol::endpoint endpoint() const { return _ep;     } 
            protocol::endpoint peer() const     { return _peer;   } 
            protocol::socket&  socket()         { return _socket; } 
    
            // TODO encapsulation
            int handle() {
                return _socket.native_handle();
            }
    
            bool valid() const { return _socket.is_open(); }
    
            void cancel() {
                _svc.post([this] { _socket.cancel(); }); 
            }
    
            using shutdown_type = boost::asio::ip::tcp::socket::shutdown_type;
            void shutdown(shutdown_type what = shutdown_type::shutdown_both) {
                _svc.post([=] { _socket.shutdown(what); }); 
            }
    
            ~connection() {
                g_stats_filer_p->inc_value("asio." + _ep.address().to_string() + ".sockets_disconnected");
            }
        };
    
    } } }
    
  • connection_pool.hpp

    #pragma once
    
    #include <mutex>
    #include "xxx/threads/null_mutex.hpp"
    #include "xxx/net/rpc/connection.hpp"
    #include "stats_filer.hpp"
    #include "log.hpp"
    
    namespace xxx { namespace net { namespace rpc {
    
        // not thread-safe by default, but pass e.g. std::mutex for `Mutex` if you need it
        template <typename Ptr = xxx::net::rpc::connection::ptr, typename Mutex = xxx::threads::null_mutex>
        struct basic_connection_pool : LogSource {
            using WeakPtr = std::weak_ptr<typename Ptr::element_type>;
    
            basic_connection_pool(std::string name = "connection_pool", size_t size)
                : LogSource(std::move(name)), _pool(size) 
            { }
    
            bool try_insert(Ptr const& conn) {
                std::lock_guard<Mutex> lk(_mx);
    
                auto slot = std::find_if(_pool.begin(), _pool.end(), std::mem_fn(&WeakPtr::expired));
    
                if (slot == _pool.end()) {
                    g_stats_filer_p->inc_value("asio." + conn->endpoint().address().to_string() + ".connections_dropped");
                    error() << "dropping connection from " << conn->peer() << ": connection pool (" << _pool.size() << ") saturated";
                    return false;
                }
    
                *slot = conn;
                return true;
            }
    
            template <typename F>
            void for_each_active(F action) {
                auto locked = [=] {
                    using namespace std;
                    lock_guard<Mutex> lk(_mx);
                    vector<Ptr> locked(_pool.size());
                    transform(_pool.begin(), _pool.end(), locked.begin(), mem_fn(&WeakPtr::lock));
                    return locked;
                }();
    
                for (auto const& p : locked)
                    if (p) action(p);
            }
    
            constexpr static bool synchronizing() {
                return not std::is_same<xxx::threads::null_mutex, Mutex>();
            }
    
          private:
            void dump_stats(LogSource::LogTx tx) const {
                // lock is assumed!
                size_t empty = 0, busy = 0, idle = 0;
    
                for (auto& p : _pool) {
                    switch (p.use_count()) {
                        case 0:  empty++; break;
                        case 1:  idle++;  break;
                        default: busy++;  break;
                    }
                }
    
                tx << "usage empty:" << empty << " busy:" << busy << " idle:" << idle;
            }
    
            Mutex _mx;
            std::vector<WeakPtr> _pool;
        };
    
        // TODO FIXME use null_mutex once growing is no longer required AND if
        // en-pooling still only happens from the single IO thread (XXX-2535)
        using server_connection_pool = basic_connection_pool<xxx::net::rpc::connection::ptr, std::mutex>;
    
    } } }
    
  • listener.hpp

    #pragma once
    
    #include "xxx/threads/null_mutex.hpp"
    #include <mutex>
    #include "xxx/net/rpc/connection_pool.hpp"
    #include "xxx/net/rpc/io_operations.hpp"
    
    namespace xxx { namespace net { namespace rpc {
    
        struct listener : std::enable_shared_from_this<listener>, LogSource {
            typedef std::shared_ptr<listener> ptr;
    
            protocol::acceptor _acceptor;
            protocol::endpoint _ep;
    
            listener(boost::asio::io_service& svc, protocol::endpoint ep, server_connection_pool& pool) 
                : LogSource("rpc::listener"), _acceptor(svc), _ep(ep), _pool(pool)
            {
                _acceptor.open(ep.protocol());
    
                _acceptor.set_option(protocol::acceptor::reuse_address(true));
                _acceptor.set_option(protocol::no_delay(true));
                ::fcntl(_acceptor.native(), F_SETFD, FD_CLOEXEC); // FIXME use non-racy socket factory?
                _acceptor.bind(ep);
    
                _acceptor.listen(32);
            }
    
            void accept_loop(std::function<void(connection::ptr conn)> on_accept) {
    
                auto self = shared_from_this();
                auto conn = std::make_shared<xxx::net::rpc::connection>(_acceptor.get_io_service(), _ep);
    
                _acceptor.async_accept(conn->_socket, [this,self,conn,on_accept](boost::system::error_code ec) {
                    if (ec) {
                        auto tx = ec == boost::asio::error::operation_aborted? debug() : warn();
                        tx << "failed accept " << ec.message();
                    } else {
                        ::fcntl(conn->_socket.native(), F_SETFD, FD_CLOEXEC); // FIXME use non-racy socket factory?
    
                        if (_pool.try_insert(conn)) {
                            on_accept(conn);
                        }
    
                        self->accept_loop(on_accept);
                    }
                });
            }
    
            void close() {
                _acceptor.cancel();
                _acceptor.close();
    
                _acceptor.get_io_service().post([=] {
                    _pool.for_each_active([] (auto const& sp) {
                        sp->shutdown(connection::shutdown_type::shutdown_both);
                        sp->cancel();
                    });
                });
    
                debug() << "shutdown";
            }
    
            ~listener() {
            }
    
          private:
            server_connection_pool& _pool;
        };
    
    } } }
    

¹ download as gist https://gist.github.com/sehe/979af25b8ac4fd77e73cdf1da37ab4c2

sehe
  • 374,641
  • 47
  • 450
  • 633
  • PS. I tend to use a nifty small utility for lifetime synchronization too: http://paste.ubuntu.com/24323983/. This way an RPC server and it's background schedulers can easily coordinate shutdown of running tasks and other things that keep a reference to anything bound to the server's lifetime. (Note the fail safe so that a bug will not lead to indefinite shutdown hang) – sehe Apr 05 '17 at 23:48
3

While others have answered similarly to the second half of this answer, it seems the most complete answer I can find, came from asking the same question on the Boost Mailing list.

http://boost.2283326.n4.nabble.com/How-to-design-proper-release-of-a-boost-asio-socket-or-wrapper-thereof-td4693442.html

I will summarize here in order to assist those that arrive here from a search in the future.

There are 2 options

1) Close the socket in order to cancel any outstanding io and then post a callback for the post-disconnection logic on the io_service and let the server class be called back when the socket has been disconnected. It can then safely release the connection. As long as there was only one thread that had called io_service::run, then other asynchronous operations will have been already been resolved when the callback is made. However, if there are multiple threads that had called io_service::run, then this is not safe.

2) As others have been pointing out in their answers, using the shared_ptr to manage to connections lifetime, using outstanding io operations to keep them alive, is viable. We can use a collection weak_ptr to the connections in order to access them if we need to. The latter is the tidbit that had been omitted from other posts on the topic which confused me.

Christopher Pisz
  • 3,757
  • 4
  • 29
  • 65
1

The way that asio solves the "deletion problem" where there are outstanding async methods is that is splits each async-enabled object into 3 classes, eg:

  • server
  • server_service
  • server_impl

there is one service per io_loop (see use_service<>). The service creates an impl for the server, which is now a handle class.

This has separated the lifetime of the handle and the lifetime of the implementation.

Now, in the handle's destructor, a message can be sent (via the service) to the impl to cancel all outstanding IO.

The handle's destructor is free to wait for those io calls to be queued if necessary (for example if the server's work is being delegated to a background io loop or thread pool).

It has become a habit with me to implement all io_service-enabled objects this way as it makes coding with aiso very much simpler.

sehe
  • 374,641
  • 47
  • 450
  • 633
Richard Hodges
  • 68,278
  • 7
  • 90
  • 142
  • 1
    This sounds good on paper, but I can't quite visualize it. Do you have a very bare bones simple example you can link? – Christopher Pisz Apr 05 '17 at 19:17
  • @RichardHodges I'm also pretty curious whether you can show an example. Most of the things you mention never occurred to me outside the realm of Asio extensions. – sehe Apr 06 '17 at 23:07
  • @sehe will knock something together after the weekend. I am on a delivery deadline at the Moment. But happy to share my experiences. – Richard Hodges Apr 06 '17 at 23:29
  • Thank you. I've marked the question as fav, although you might elect to post somewhere else if you think it better serves the purpose – sehe Apr 06 '17 at 23:37
  • @RichardHodges It's holiday season where I live. Anything you can point me to give this a whirl for myself? – sehe Jun 29 '17 at 11:15
  • @sehe I am in the middle of delivering a hard deadline at the moment. If you look at the source code of asio you'll see that all io objects are implemented as a trio of handle, service and implementation. There is a one to one relationship between the service and the io_service. The service can keep track of all instances that it manages. It can also get hold of other services it might need via a call to use_service on its owning io loop. Thus, services can serve as caches of instances. They can track instances with shared/weak pointers if necessary. Note that from the users point of view – Richard Hodges Jun 29 '17 at 16:58
  • @sehe the lifetime of an io object is governed by the lifetime of the handle class. The internal implementation can live longer, and is under control of the service. This allows objects with complex async cleanup operations to be simply deleted by the user while the service cleans up in the background. An example might be a clean shutdown of a web socket over ssl. There is the websocket shutdown protocol, the ssl shutdown protocol followed by the shutdown of the underlying socket. These can be handled by 3 separate services. – Richard Hodges Jun 29 '17 at 17:01
  • @sehe see latest commit. goblins implemented as state machines on a background thread pool, includes boost::future continuation compatibility. Want to make a fun game? https://github.com/madmongo1/goblins – Richard Hodges Jul 08 '17 at 18:46
  • @ChristopherPisz I (eventually) created a full demo here: https://github.com/madmongo1/goblins – Richard Hodges Jul 08 '17 at 18:50
  • 1
    @sehe another example of the value of separation of objects into handle, service and impl: https://stackoverflow.com/a/45120652/2015579 – Richard Hodges Jul 15 '17 at 16:59
1

Connection lifetime is a fundamental issue with boost::asio. Speaking from experience, I can assure you that getting it wrong causes "undefined behaviour"...

The asio examples use shared_ptr to ensure that a connection is kept alive whilst it may have outstanding handlers in an asio::io_service. Note that even in a single thread, an asio::io_service runs asynchronously to the application code, see CppCon 2016: Michael Caisse "Asynchronous IO with Boost.Asio" for an excellent description of the precise mechanism.

A shared_ptr enables the lifetime of a connection to be controlled by the shared_ptr instance count. IMHO it's not "cheating and cheating big"; but an elegant solution to complicated problem.

However, I agree with you that just using shared_ptr's to control connection lifetimes is not a complete solution since it can lead to resource leaks.

In my answer here: Boost async_* functions and shared_ptr's, I proposed using a combination of shared_ptr and weak_ptr to manage connection lifetimes. An HTTP server using a combination of shared_ptr's and weak_ptr's can be found here: via-httplib.

The HTTP server is built upon an asynchronous TCP server which uses a collection of (shared_ptr's to) connections, created on connects and destroyed on disconnects as you propose.

Community
  • 1
  • 1
kenba
  • 4,303
  • 1
  • 23
  • 40
  • Mmm. If the solution is only for single-threaded, a weak_ptr completely fails to add anything a raw pointer. Most importantly, it's Undefined Behaviour if the lifetime actually ended. I can't condone this idea. – sehe Apr 06 '17 at 07:27
  • @sehe I think that you have misunderstood my answer. The solution is certainly **not** only for single threaded solutions. It works with multiple connections handled by multiple threads whilst enabling a server to control the lifetime of the connections. – kenba Apr 06 '17 at 08:38