1

I have a tcp client that polls a server for an answer with a deadline so that if the server cannot be reached the client is not blocked. The problem I have encountered is that the async_wait never seems to call its handler, effectively blocking the client when it fails to connect. Every call to tcpPoll will have its own thread (which is why I create a new io_service) but it doesn't seem to work even without multithreading. Also, the NetworkEntity object can call tcpPoll more than once during its lifetime.

My question would be : What is blocking the deadline_timer from calling its handler? (and how to fix it)

Here is some relevant code, which is working as long as nothing fails (connect, write, read) (sorry if it's a bit long) :

void NetworkEntity::stop()
{
    stopped_ = true;
    //close socket
    //cancel timeout
}

void NetworkEntity::check_timeout(const boost::system::error_code& error)
{
    if (stopped_)
        return;
    if (timeout_.expires_at() <= boost::asio::deadline_timer::traits_type::now())
    {
        stop();
        timeout_.expires_at(boost::posix_time::pos_infin);
        std::cout << address_ << " timed out\n";
    }
    timeout_.async_wait(boost::bind(&NetworkEntity::check_timeout, this, boost::asio::placeholders::error));
}

std::vector<std::string> NetworkEntity::tcpPoll(const char* message, const char endOfMessage)
{
    boost::asio::io_service io_service;
    stopped_ = false;
    timeout_.expires_from_now(boost::posix_time::seconds(TIMEOUT_));
    timeout_.async_wait(boost::bind(&NetworkEntity::check_timeout, this, boost::asio::placeholders::error));
    tcp::resolver resolver(io_service);
    start_connect(&io_service, resolver.resolve(tcp::resolver::query(address_, port_)), message, endOfMessage);
    io_service.run();
    //retrieve answer from class
    //return answer
}

void NetworkEntity::start_connect(boost::asio::io_service* io_service, tcp::resolver::iterator endpoint_iterator, const std::string message, const char endOfMessage)
{
    socket_.reset(new tcp::socket(*io_service));
    socket_->async_connect(endpoint_iterator->endpoint(),
        boost::bind(&NetworkEntity::handle_connect, this, io_service, boost::asio::placeholders::error, endpoint_iterator, message, endOfMessage));
}

void NetworkEntity::handle_connect(boost::asio::io_service* io_service, const boost::system::error_code& err, tcp::resolver::iterator endpoint_iterator, const std::string message, const char endOfMessage)
{
    if(stopped_)
        return;
    if (err)
    {
        std::cout << "Connect error: " << err.message() << "\n";
        stop();
    }
    else
    {
        start_write(message, endOfMessage);
    }
}

void NetworkEntity::start_write(const std::string message, const char endOfMessage)
{
    std::ostream request_stream(&request_);
    request_stream << message;
    boost::asio::async_write(*socket_, request_,
        boost::bind(&NetworkEntity::handle_write, this, boost::asio::placeholders::error, endOfMessage));
}

void NetworkEntity::handle_write(const boost::system::error_code& error, const char endOfMessage)
{
    if (stopped_)
        return;
    if (!error)
    {
        //sleep for 500ms to let time for the reciever to process info (had a bug on this one)
        start_read(endOfMessage);
    }
    else
    {
        std::cout << "write error : " << error.message() << "\n";
        stop();
    }
}

void NetworkEntity::start_read(const char endOfMessage)
{
    boost::asio::async_read_until(*socket_, answer_, endOfMessage,
        boost::bind(&NetworkEntity::handle_read, this, boost::asio::placeholders::error));
}

void NetworkEntity::handle_read(const boost::system::error_code& error)
{
    if (stopped_)
        return;
    if (error)
    {
        std::cout << "read error : " << error.message() << "\n";
        stop();
    }
    else
    {
        stop();
    }
}
M__
  • 71
  • 1
  • 9
  • Pleeeeeeeeeeeeeease make your sample self-contained next time? The error could have been obvious that way. This way, we've spent time 'making' up io services, deadline_timers, address/port values, "guessing" that socket would be shared_ptr to one (why?) and that request_/answer_ are (probably) streambufs. And, worst of all, we implement those according to our own experience, not necessarily how you actually did it (making it harder to see what tripped you up) – sehe Jun 19 '14 at 07:50
  • Noted! I will next time – M__ Jun 19 '14 at 12:29

1 Answers1

2

I think you must be confused with multiple instances of io_service.

The reason I think this is because in your code you never show how you initialize timeout_. And the io_service instance you are using for the connection is only instantiated inside your tcpPoll function... This leads me to believe that you are accidentally registering the deadline timer on a separate io_service that you don't even run?

Here's a version that actually works, notes:

  1. it has done away with the stopped_ boolean, as it's unnecessary and bound to confuse
  2. see new implementation of check_timeout and stop on how to detect the various program flows
  3. the full code, self-contained, is exactly only few more lines of code than what you posted in the question
#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>

using tcp = boost::asio::ip::tcp;

struct NetworkEntity {

    boost::asio::io_service io_service;
    boost::asio::deadline_timer timeout_{io_service};
    std::string address_ = "localhost";
    std::string port_    = "6767";
    int TIMEOUT_         = 3;
    boost::shared_ptr<tcp::socket> socket_;
    boost::asio::streambuf request_, answer_;

    void stop()
    {
        if (socket_)
        {
            socket_->cancel();
            socket_->close();
        }
        timeout_.cancel();
        io_service.stop();
    }

    void check_timeout(const boost::system::error_code& error)
    {
        if (error != boost::asio::error::operation_aborted)
        {
            stop();
            std::cout << address_ << " timed out\n";
        }
        timeout_.async_wait(boost::bind(&NetworkEntity::check_timeout, this, boost::asio::placeholders::error));
    }

    std::vector<std::string> tcpPoll(const char* message, const char endOfMessage)
    {
        timeout_.expires_from_now(boost::posix_time::seconds(TIMEOUT_));
        timeout_.async_wait(boost::bind(&NetworkEntity::check_timeout, this, boost::asio::placeholders::error));

        tcp::resolver resolver(io_service);
        start_connect(&io_service, resolver.resolve(tcp::resolver::query(address_, port_)), message, endOfMessage);

        io_service.run();
        //retrieve answer from class
        //return answer
        std::ostringstream oss;
        oss << &answer_;
        return { oss.str() };
    }

    void start_connect(boost::asio::io_service* io_service, tcp::resolver::iterator endpoint_iterator, const std::string message, const char endOfMessage)
    {
        socket_.reset(new tcp::socket(*io_service));
        socket_->async_connect(endpoint_iterator->endpoint(),
                boost::bind(&NetworkEntity::handle_connect, this, io_service, boost::asio::placeholders::error, endpoint_iterator, message, endOfMessage));
    }

    void handle_connect(boost::asio::io_service* io_service,
            const boost::system::error_code& err,
            tcp::resolver::iterator endpoint_iterator,
            const std::string message,
            const char endOfMessage)
    {
        if (err)
        {
            std::cout << "Connect error: " << err.message() << "\n";
            stop();
        }
        else
        {
            start_write(message, endOfMessage);
        }
    }

    void start_write(const std::string message, const char endOfMessage)
    {
        std::ostream request_stream(&request_);
        request_stream << message;
        boost::asio::async_write(*socket_, request_,
                boost::bind(&NetworkEntity::handle_write, this, boost::asio::placeholders::error, endOfMessage));
    }

    void handle_write(const boost::system::error_code& error, const char endOfMessage)
    {
        if (!error)
        {
            //sleep for 500ms to let time for the reciever to process info (had a bug on this one)
            start_read(endOfMessage);
        }
        else
        {
            std::cout << "write error : " << error.message() << "\n";
            stop();
        }
    }

    void start_read(const char endOfMessage)
    {
        boost::asio::async_read_until(*socket_, answer_, endOfMessage,
                boost::bind(&NetworkEntity::handle_read, this, boost::asio::placeholders::error));
    }

    void handle_read(const boost::system::error_code& error)
    {
        if (error)
        {
            std::cout << "read error : " << error.message() << "\n";
        }
        stop();
    }
};

int main()
{
    NetworkEntity ne;
    for (auto& s : ne.tcpPoll("this is my request", '\n'))
    {
        std::cout << "Line: '" << s << "'\n";
    }
}
sehe
  • 374,641
  • 47
  • 450
  • 633
  • I agree with Mister sehe, you are not so far from being an asio expert dear mister M__. asio learning curve is a little bit tough, that post [link](http://stackoverflow.com/questions/15568100/confused-when-boostasioio-service-run-method-blocks-unblocks?rq=1) may help you. – Jean Davy Jun 19 '14 at 08:39
  • I think you are in fact right, I was initializing `timeout_` in the constructor (which I didn't show) with my main `io_service`. I didn't think about that when I changed from one `io_service` to one per thread. Thanks! – M__ Jun 19 '14 at 12:27