1

I have a multithread application which uses boost::asio and boost::coroutine via its integration in boost::asio. Every thread has its own io_service object. The only shared state between threads are connection pools which are locked with mutex when connection is get or returned from/to the connection pool. When there is not enough connections in the pool I push infinite asio::steady_tiemer in internal structure of the pool and asynchronously waiting on it and I yielding from the couroutine function. When other thread returns connection to the pool it checks whether there is waiting timers, it gets waiting timer from the internal structure, it gets its io_service object and posts a lambda which wakes up the timer to resume the suspended coroutine. I have random crashes in the application. I try to investigate the problem with valgrind. It founds some issues but I cannot understand them because they happen in boost::coroutine and boost::asio internals. Here are fragments from my code and from valgrind output. Can someone see and explain the problem?

Here is the calling code:

template <class ContextsType>
void executeRequests(ContextsType& avlRequestContexts)
{
    AvlRequestDataList allRequests;
    for(auto& requestContext : avlRequestContexts)
    {
        if(!requestContext.pullProvider || !requestContext.toAskGDS())
            continue;

        auto& requests = requestContext.pullProvider->getRequestsData();
        copy(requests.begin(), requests.end(), back_inserter(allRequests));
    }

    if(allRequests.size() == 0)
        return;

    boost::asio::io_service ioService;
    curl::AsioMultiplexer multiplexer(ioService);

    for(auto& request : allRequests)
    {
        using namespace boost::asio;

        spawn(ioService, [&multiplexer, &request](yield_context yield)
        {
            request->prepare(multiplexer, yield);
        });
    }

    while(true)
    {
        try
        {
            VLOG_DEBUG(avlGeneralLogger, "executeRequests: Starting ASIO event loop.");
            ioService.run();
            VLOG_DEBUG(avlGeneralLogger, "executeRequests: ASIO event loop finished.");
            break;
        }
        catch(const std::exception& e)
        {
            VLOG_ERROR(avlGeneralLogger, "executeRequests: Error while executing GDS request: " << e.what());
        }
        catch(...)
        {
            VLOG_ERROR(avlGeneralLogger, "executeRequests: Unknown error while executing GDS request.");
        }
    }
}

Here is the prepare function implementation which is called in spawned lambda:

void AvlRequestData::prepareImpl(curl::AsioMultiplexer& multiplexer,
                                 boost::asio::yield_context yield)
{
    auto& ioService = multiplexer.getIoService();
    _connection = _pool.getConnection(ioService, yield);
    _connection->prepareRequest(xmlRequest, xmlResponse, requestTimeoutMS);

    multiplexer.addEasyHandle(_connection->getHandle(),
                              [this](const curl::EasyHandleResult& result)
    {
        if(0 == result.responseCode)
            returnQuota();
        VLOG_DEBUG(lastSeatLogger, "Response " << id << ": " << xmlResponse);
        _pool.addConnection(std::move(_connection));
    });
}


void AvlRequestData::prepare(curl::AsioMultiplexer& multiplexer,
                             boost::asio::yield_context yield)
{
    try
    {
        prepareImpl(multiplexer, yield);
    }
    catch(const std::exception& e)
    {
        VLOG_ERROR(lastSeatLogger, "Error wile preparing request: " << e.what());
        returnQuota();
    }
    catch(...)
    {
        VLOG_ERROR(lastSeatLogger, "Unknown error while preparing request.");
        returnQuota();
    }
}

The returnQuota function is pure virtual method of the AvlRequestData class and its implementation for the TravelportRequestData class which is used in all my tests is the following:

void returnQuota() const override
{
    auto& avlQuotaManager = AvlQuotaManager::getInstance();
    avlQuotaManager.consumeQuotaTravelport(-1);
}

Here are push and pop methods of the connection pool.

auto AvlConnectionPool::getConnection(
        TimerPtr timer,
        asio::yield_context yield) -> ConnectionPtr
{
    lock_guard<mutex> lock(_mutex);

    while(_connections.empty())
    {
        _timers.emplace_back(timer);
        timer->expires_from_now(
            asio::steady_timer::clock_type::duration::max());

        _mutex.unlock();
        coroutineAsyncWait(*timer, yield);
        _mutex.lock();
    }

    ConnectionPtr connection = std::move(_connections.front());
    _connections.pop_front();

    VLOG_TRACE(defaultLogger, str(format("Getted connection from pool: %s. Connections count %d.")
                                  % _connectionPoolName % _connections.size()));

    ++_connectionsGiven;

    return connection;
}

void AvlConnectionPool::addConnection(ConnectionPtr connection,
                                      Side side /* = Back */)
{
    lock_guard<mutex> lock(_mutex);

    if(Front == side)
        _connections.emplace_front(std::move(connection));
    else
        _connections.emplace_back(std::move(connection));

    VLOG_TRACE(defaultLogger, str(format("Added connection to pool: %s. Connections count %d.")
                                  % _connectionPoolName % _connections.size()));

    if(_timers.empty())
        return;

    auto timer = _timers.back();
    _timers.pop_back();

    auto& ioService = timer->get_io_service();
    ioService.post([timer](){ timer->cancel(); });

    VLOG_TRACE(defaultLogger, str(format("Connection pool %s: Waiting thread resumed.")
                                  % _connectionPoolName));
}

This is implementation of coroutineAsyncWait.

inline void coroutineAsyncWait(boost::asio::steady_timer& timer,
                               boost::asio::yield_context yield)
{
    boost::system::error_code ec;
    timer.async_wait(yield[ec]);
    if(ec && ec != boost::asio::error::operation_aborted)
        throw std::runtime_error(ec.message());
}

And finally the first part of the valgrind output:

==8189== Thread 41:
==8189== Invalid read of size 8
==8189== at 0x995F84: void boost::coroutines::detail::trampoline_push_void, void, boost::asio::detail::coro_entry_point, void (anonymous namespace)::executeRequests > >(std::vector<(anonymous namespace)::AvlRequestContext, std::allocator<(anonymous namespace)::AvlRequestContext> >&)::{lambda(boost::asio::basic_yield_context >)#1}>&, boost::coroutines::basic_standard_stack_allocator > >(long) (trampoline_push.hpp:65)
==8189== Address 0x2e3b5528 is not stack'd, malloc'd or (recently) free'd

When I use valgrind with debugger attached it stops in the following function in trampoline_push.hpp in boost::coroutine library.

53│ template< typename Coro >
54│ void trampoline_push_void( intptr_t vp)
55│ {
56│     typedef typename Coro::param_type   param_type;
57│
58│     BOOST_ASSERT( vp);
59│
60│     param_type * param(
61│         reinterpret_cast< param_type * >( vp) );
62│     BOOST_ASSERT( 0 != param);
63│
64│     Coro * coro(
65├>        reinterpret_cast< Coro * >( param->coro) );
66│     BOOST_ASSERT( 0 != coro);
67│
68│     coro->run();
69│ }
bobeff
  • 3,543
  • 3
  • 34
  • 62
  • Please post your `returnQuota` method body. – Galimov Albert Jul 24 '15 at 15:04
  • 3
    In a cursory glance, the catch-all suppression in `AvlRequestData::prepare()` is suspicious and violates a Boost.Coroutine requirement (see [here](http://www.boost.org/doc/libs/1_57_0/libs/coroutine/doc/html/coroutine/coroutine/asymmetric.html#coroutine.coroutine.asymmetric.exceptions)). Does the problem persists if you catch `const boost::coroutines::detail::forced_unwind&` and rethrow it? – Tanner Sansbury Jul 24 '15 at 15:23
  • @Tanner Sansbury = 10x for spotting this. I added rethrowing of the `forced_unwind` exception but the problem persists. – bobeff Jul 26 '15 at 15:59
  • @PSIAlt - I added the `returnQuota` function in the question. – bobeff Jul 26 '15 at 16:13
  • I posted another [question](http://stackoverflow.com/questions/31639888/whats-wrong-with-this-boostasio-and-boostcoroutine-usage-pattern) with *proof of concept* to simulate the problem in isolation. – bobeff Jul 26 '15 at 17:34
  • It seems that *valgrind* errors are caused because of **BOOST_USE_VALGRIND** macro is not defined. The program runs without *valgrind* errors when the macro is defined. – bobeff Jul 28 '15 at 14:00
  • For clarification, does the program run without error when either defining `BOOST_USE_VALGRIND` or running without valgrind? I read the original question as though the program was crashing, and only afterwards was valgrind used to investigate. – Tanner Sansbury Jul 28 '15 at 16:02
  • @Tanner Sansbury The program running only without *valgrind* errors. The reason for the crashes is still unknown. It can be totally unrelated to *coroutines* or any of the *boost* libraries used. The program crashes only on production and the hope was that when we clear the *valgrind* we will clear the reason for the crashes also. But as it seems this isn't the case. Still have a chance the reason to be in the lack of rethrowing `forced_unwind` exception as you spotted, but yet must be deployed build with this fix on production. For now we are not able to reproduce the crash in house. – bobeff Jul 28 '15 at 16:31

1 Answers1

2

Ultimately I found that when objects need to be deleted, boost::asio doesn't handle it gracefully without proper use of shared_ptr and weak_ptr. When crashes do occur, they are very difficult to debug, because its hard to look into what the io_service queue is doing at the time of failure.

After doing a full asynchronous client architecture recently and running into random crashing issues, I have a few tips to offer. Unfortunately, I cannot know whether these will solve your issues, but hopefully it provides a good start in the right direction.

Boost Asio Coroutine Usage Tips

  1. Use boost::asio::asio_handler_invoke instead of io_service.post():

    auto& ioService = timer->get_io_service();

    ioService.post(timer{ timer->cancel(); });

    Using post/dispatch within a coroutine is usually a bad idea. Always use the asio_handler_invoke when you are called from a coroutine. In this case, however, you can probably safely call timer->cancel() without posting it to the message loop anyways.

  2. Your timers do not appear to use shared_ptr objects. Regardless of what is going on in the rest of your application, there is no way to know for sure when these objects should be destroyed. I would highly recommend using shared_ptr objects for all of your timer objects. Also, any pointer to class methods should use shared_from_this() as well. Using a plain this can be quite dangerous if this is destructed (on the stack) or goes out of scope somewhere else in a shared_ptr. Whatever you do, do not use shared_from_this() in the constructor of an object!

    If you're getting a crash when a handler within the io_service is being executed, but part of the handler is no longer valid, this is a seriously difficult thing to debug. The handler object that is pumped into the io_service includes any pointers to timers, or pointers to objects that might be necessary to execute the handler.

    I highly recommend going overboard with shared_ptr objects wrapped around any asio classes. If the problem goes away, then its likely order of destruction issues.

  3. Is the failure address location on the heap somewhere or is it pointing to the stack? This will help you diagnose whether its an object going out of scope in a method at the wrong time, or if it is something else. For instance, this proved to me that all of my timers must become shared_ptr objects even within a single threaded application.
havocheavy
  • 120
  • 5
  • 1
    Can you please elaborate on why one should always use `asio_handler_invoke()` instead of post/dispatch within a coroutine? Due to the potentially desirable differences in execution, I am having a hard time reaching this conclusion. My rule of thumb is to use `shared_ptr` when shared ownership is desired or the asynchronous operation should extend the lifetime of the object. For me, using `shared_ptr` outside of those two cases can be a strong indication that the asynchronous call chain design is unnecessary complicated. Also, the coroutine stack is allocated within the free space. – Tanner Sansbury Aug 24 '15 at 16:33
  • You might actually be right about these things. The issue is usually when everything needs to be destructed. My use scenario had issues in that case particularly. – havocheavy Aug 25 '15 at 20:11
  • I do want to say that I'm not sure I understand the nuances of asio_handler_invoke() vs post/dispatch. Especially when executing within a coroutine or strand. I can't actually point to that as a direct fix though, but I just figured it was good practice to use the io_service associated with a given coroutine. – havocheavy Aug 25 '15 at 20:20