3

The code below is intended to do the following: I have a resolver object which wraps boost asio. The resolver object holds the io service and a worker so the io service run function never returns. As long as the resolver object is alive, async requests can be made. When the resolver objects goes out of scope and there are still requests in the queue, I want to finish all and the resolver object gets destroyed.

In this case there is no handler called at all and I don't know why. I think there might be a problem with the shared pointers and some dependency cycle. Running with valgrind reports "possibly lost memory".

Any ideas how to make this working so the resolver object stays alive until all work is done?

#include <boost/asio.hpp>
#include <memory>
#include <thread>
#include <functional>
#include <string>
#include <iostream>

struct Resolver : public std::enable_shared_from_this<Resolver> {
    boost::asio::io_service                        io_service;
    std::unique_ptr<boost::asio::io_service::work> work;
    std::unique_ptr<std::thread>                   iothread;

    struct Query : public std::enable_shared_from_this<Query>{
        std::shared_ptr<Resolver>                                       service;
        boost::asio::ip::tcp::resolver                                  resolver;
        boost::asio::ip::tcp::resolver::query                           query;
        std::function<void(boost::asio::ip::tcp::resolver::iterator &)> handler;

        Query(std::shared_ptr<Resolver> res, std::function<void(boost::asio::ip::tcp::resolver::iterator &)> handler, const std::string &name) : resolver(res->io_service), query(name, ""), handler(handler) {
            service = res;

        }

        void start() {
                auto self = shared_from_this();
                resolver.async_resolve(query, [self](const boost::system::error_code& ec, boost::asio::ip::tcp::resolver::iterator iterator){
                    self->handler(iterator);
                });     
        }
    };

    Resolver() {
        work.reset(new boost::asio::io_service::work(io_service));
        iothread.reset(new std::thread(std::bind(&Resolver::io, this)));
    }

    ~Resolver() {
        std::cout << "Resolver destroyed" << std::endl;
        work.reset();
        iothread->join();
    }

    void io() {
        io_service.run();
    }

    void asyncResolve(const std::string &name, std::function<void(boost::asio::ip::tcp::resolver::iterator &)> fn) {
        auto query = std::make_shared<Query>(shared_from_this(), fn, name);
        query->start();
    }
};

void test(boost::asio::ip::tcp::resolver::iterator it) {
    std::cout << "Test" << std::endl;
    std::cout << it->endpoint().address().to_string() << std::endl;
}

int main(int argc, const char **argv) {
    auto res = std::make_shared<Resolver>();
    res->asyncResolve("stackoverflow.com", &test);
    res->asyncResolve("stackoverflow.com", &test);
    res->asyncResolve("stackoverflow.com", &test);
    res->asyncResolve("stackoverflow.com", &test);
    res->asyncResolve("stackoverflow.com", &test);
}
sehe
  • 374,641
  • 47
  • 450
  • 633
Gustavo
  • 919
  • 11
  • 34

1 Answers1

5

Just running the service (io_service::run()) already ensures that all asynchronous operations have completed (see the documentation).

You already do this on the worker thread, and you join that thread, so you should be fine!

The only exception would be if a handler throws, so to be extra-precise you should handle exceptions from the run(): Should the exception thrown by boost::asio::io_service::run() be caught?

void io() { 
    // http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.effect_of_exceptions_thrown_from_handlers
    for (;;) {
        try {
            io_service.run();
            break; // exited normally
        } catch (std::exception const &e) {
            std::cerr << "[Resolver] An unexpected error occurred: " << e.what();
        } catch (...) {
            std::cerr << "[Resolver] An unexpected error occurred";
        }
    }
}

So... Where's The Problem?

The problem is quite finicky and hides between threads and shared_ptr.

The shared pointer causes ~Resolver to run on the worker thread. This means that you cannot join() the the worker thread (since a thread can never join itself). A good implementation will throw an exception, which causes the process to terminate.

And there's more: if you just exit main() while the worker thread is processing the asynchronous tasks, the completion handlers may run after globals like std::cout have been torn down. So to actually **see* that Resolver completes the work and destructs, you need to make sure that main doesn't exit too quickly.

Simplifying:

Now, the following is a simplified example that does show that the asynchronous operations do complete: (there are still issues):

#define BOOST_ASIO_ENABLE_HANDLER_TRACKING 1
#include <boost/asio.hpp>
#include <boost/optional.hpp>
#include <thread>
#include <iostream>

class Resolver : public std::enable_shared_from_this<Resolver> {
    using tcp = boost::asio::ip::tcp;
    using io_service = boost::asio::io_service;

    io_service _svc;
    tcp::resolver resolver { _svc };

    boost::optional<io_service::work> work { _svc };
    std::thread _worker { [this] { event_loop(); } };

    void event_loop() { 
        // http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.effect_of_exceptions_thrown_from_handlers
        for (;;) {
            std::cout << __PRETTY_FUNCTION__ << "\n";
            try {
                _svc.run();
                break; // exited normally
            } catch (std::exception const &e) {
                std::cerr << "[Resolver] An unexpected error occurred: " << e.what() << "\n";
            } catch (...) {
                std::cerr << "[Resolver] An unexpected error occurred\n";
            }
        }
        std::cout << "EXIT " << __PRETTY_FUNCTION__ << "\n";
    }

  public:
    ~Resolver() {
        std::cout << __PRETTY_FUNCTION__ << "\n";
        work.reset();
    }

    using Endpoint = tcp::endpoint;
    using Callback = std::function<void(Endpoint)>;

    void asyncResolve(std::string const& name, Callback fn) {
        auto self = shared_from_this();
        resolver.async_resolve({name, ""}, [self,fn](boost::system::error_code ec, tcp::resolver::iterator it) {
                if (!ec) fn(it->endpoint());
            });
    }
};

void test_handler(Resolver::Endpoint ep) {
    std::cout << "Test: " <<  ep << "\n";
}

int main() {
    {
        auto res = std::make_shared<Resolver>();
        for (auto fqdn : {"stackoverflow.com", "google.com", "localhost"})
            res->asyncResolve(fqdn, test_handler);
    }
    std::cout << "Released shared resolver\n";

    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "Main exit\n";
}

Prints:

void Resolver::event_loop()
Released shared resolver
Test: 151.101.65.69:0
Test: 172.217.17.46:0
Test: 127.0.0.1:0
Resolver::~Resolver()
terminate called without an active exception

Handler tracking: enter image description here

The Remaining Problem

The final problem is that now we do not join the thread. Which throws from the std::thread::~thread destructor. This is tricky problem:

  • we cannot join() because we might be on that worker thread
  • we cannot detach() because that will create a data race where the worker thread still runs after the destructor completes.

The options are:

  1. call _svc::run() from the destructor, instead of join()-ing the thread. This works, but could not be appropriate if the service is being used for more asynchronous tasks, because as side effect queued operations might run on the thread that causes the destructor to run.

  2. call join() if we are not the worker thread, and run() if we are. This is always safe, because run() can be called nested and the operations still run from the worker thread as expected

  3. just join and catch the system_error exception with error_condition resource_deadlock_would_occur

I'd say the second is the cleanest. But in your simple example there is no issue with the first option, because (a) the destructor will always run from the worker thread if there was an extant resolve operations (b) if there wasn't, the service queue must have been empty, so run() effectively does nothing.

So here's a fix:

~Resolver() {
    std::cout << __PRETTY_FUNCTION__ << "\n";
    work.reset();

    event_loop();
    if (_worker.joinable()) {
       if (_worker.get_id() == std::this_thread::get_id())
           _worker.detach();
       else
           _worker.join();
    }
}

And now the output is

void Resolver::event_loop()
Released shared resolver
Test: 151.101.193.69:0
Test: 216.58.212.238:0
Test: 127.0.0.1:0
Resolver::~Resolver()
void Resolver::event_loop()
Main exit
sehe
  • 374,641
  • 47
  • 450
  • 633