5

I'm trying to write a wrapper synchronous method around async_read to allow non blocking reads on a socket. Following several examples around internet I have developed a solution that seems to be almost right but which is not working.

The class declares these relevant attributes and methods:

class communications_client
{
    protected:
        boost::shared_ptr<boost::asio::io_service> _io_service;
        boost::shared_ptr<boost::asio::ip::tcp::socket> _socket;
        boost::array<boost::uint8_t, 128> _data;

        boost::mutex _mutex;
        bool _timeout_triggered;
        bool _message_received;
        boost::system::error_code _error;
        size_t _bytes_transferred;

        void handle_read(const boost::system::error_code & error, size_t bytes_transferred);
        void handle_timeout(const boost::system::error_code & error);
        size_t async_read_helper(unsigned short bytes_to_transfer, const boost::posix_time::time_duration & timeout, boost::system::error_code & error);

        ...
}

The method async_read_helper is the one that encapsulates all the complexity, while the other two handle_readand handle_timeout are just the event handlers. Here is the implementation of the three methods:

void communications_client::handle_timeout(const boost::system::error_code & error)
{
    if (!error)
    {
        _mutex.lock();
        _timeout_triggered = true;
        _error.assign(boost::system::errc::timed_out, boost::system::system_category());
        _mutex.unlock();
    }
}

void communications_client::handle_read(const boost::system::error_code & error, size_t bytes_transferred)
{
    _mutex.lock();
    _message_received = true;
    _error = error;
    _bytes_transferred = bytes_transferred;
    _mutex.unlock();
}

size_t communications_client::async_read_helper(unsigned short bytes_to_transfer, const boost::posix_time::time_duration & timeout, boost::system::error_code & error)
{
    _timeout_triggered = false;
    _message_received = false;

    boost::asio::deadline_timer timer(*_io_service);
    timer.expires_from_now(timeout);
    timer.async_wait(
        boost::bind(
            &communications_client::handle_timeout,
            this,
            boost::asio::placeholders::error));

    boost::asio::async_read(
        *_socket,
        boost::asio::buffer(_data, 128),
        boost::asio::transfer_exactly(bytes_to_transfer),
        boost::bind(
            &communications_client::handle_read,
            this,
            boost::asio::placeholders::error,
            boost::asio::placeholders::bytes_transferred));

    while (true)
    {
        _io_service->poll_one();
        if (_message_received)
        {
            timer.cancel();
            break;
        }
        else if (_timeout_triggered)
        {
            _socket->cancel();
            break;
        }
    }

    return _bytes_transferred;
}

The main question I have is: why this works with a loop on _io_service->poll_one()and no without a loop and calling _io_service->run_one()? Also, I would like to know if it looks correct to anyone who is more used to work with Boost and Asio. Thank you!


FIX PROPOSAL #1

According to the comments done by Jonathan Wakely the loop could be replaced using _io_service->run_one() with a call to _io_service->reset() after the operations have finished. It should look like:

_io_service->run_one();
if (_message_received)
{
    timer.cancel();
}
else if (_timeout_triggered)
{
    _socket->cancel();
}

_io_service->reset();

After some testing, I have checked that this kind of solution alone is not working. The handle_timeoutmethod is being called continuously with the error code operation_aborted. How can these calls be stopped?

FIX PROPOSAL #2

The answer by twsansbury is accurate and based onto solid documentation basis. That implementation leads to the following code within the async_read_helper:

while (_io_service->run_one())
{
    if (_message_received)
    {
        timer.cancel();
    }
    else if (_timeout_triggered)
    {
        _socket->cancel();
    }
}
_io_service->reset();

and the following change to the handle_read method:

void communications_client::handle_read(const boost::system::error_code & error, size_t bytes_transferred)
{
    if (error != boost::asio::error::operation_aborted)
    {
        ...
    }
}

This solution has proved solid and correct during testing.

Community
  • 1
  • 1
yeyeyerman
  • 7,751
  • 7
  • 43
  • 52
  • The mutex locking/unlocking looks wrong, you should use `boost::lock_guard` to lock and have it unlock automatically in the destructor. – Jonathan Wakely Jun 02 '12 at 00:06
  • @JonathanWakely that solution would be more boost-like. It's pthread fault... :) But, as I understand, the outcome of both implementations should be same. – yeyeyerman Jun 02 '12 at 00:16
  • And since you edited it you don't have a mutex protecting the reads of `_message_received` and `_timeout_triggered`. Also I assume `pan_tilt_EN304_client` should be `communications_client`? – Jonathan Wakely Jun 02 '12 at 00:17
  • Are you calling `_io_service->reset()` in the version using `run_one()` that doesn't work? – Jonathan Wakely Jun 02 '12 at 00:22
  • @JonathanWakely sorry i'm fixing the code while you trying to answer. 1. it seems that the extra lock/unlock around the attributes are not necessary at that point because there no async operations running at that point. 2. in the ``run_one``version i wasn't calling ``_io_service->reset()``. That makes sense because the problem was that it kept reading the same bytes all the time. – yeyeyerman Jun 02 '12 at 00:29
  • @JonathanWakely according to the documentation, it seems that a call to ``io_service->reset()`` shall be done after a call to ``io_service->poll_one()`` too http://www.boost.org/doc/libs/1_49_0/doc/html/boost_asio/reference/io_service/reset.html should I add that inside the loop? – yeyeyerman Jun 02 '12 at 00:49

1 Answers1

8

The main difference between io_service::run_one() and io_service::poll_one() is that run_one() will block until a handler is ready to run, whereas poll_one() will not wait for any outstanding handlers to become ready.

Assuming the only outstanding handlers on _io_service are handle_timeout() and handle_read(), then run_one() does not require a loop because it will only return once either handle_timeout() or handle_read() have ran. On the other hand, poll_one() requires a loop because poll_one() will return immediately, as neither handle_timeout() nor handle_read() are ready to run, causing the function to eventually return.

The main issue with the original code, as well as the fix proposal #1, is that there are still outstanding handlers in the io_service when async_read_helper() returns. Upon the next call to async_read_helper(), the next handler to be invoked will be a handler from the previous call. The io_service::reset() method only allows the io_service to resume running from a stopped state, it does not remove any handlers already queued into the io_service. To account for this behavior, try using a loop to consume all of the handlers from the io_service. Once all handlers have been consumed, exit the loop and reset the io_service:

// Consume all handlers.
while (_io_service->run_one())
{
  if (_message_received)
  {
    // Message received, so cancel the timer.  This will force the completion of
    // handle_timer, with boost::asio::error::operation_aborted as the error.
    timer.cancel();
  }
  else if (_timeout_triggered)
  {
    // Timeout occured, so cancel the socket.  This will force the completion of
    // handle_read, with boost::asio::error::operation_aborted as the error.
    _socket->cancel();
  }
}

// Reset service, guaranteeing it is in a good state for subsequent runs.
_io_service->reset();

From the caller's perspective, this form of timeout is synchronous as run_one() blocks. However, work is still being made within the I/O service. An alternative is to use Boost.Asio's support for C++ futures to wait on a future and perform a timeout. This code can be easier to read, but it requires at least one other thread to be processing the I/O service, as the thread waiting on the timeout is no longer processing the I/O service:

// Use an asynchronous operation so that it can be cancelled on timeout.
std::future<std::size_t> read_result = boost::asio::async_read(
    socket, buffer, boost::asio::use_future);

// If timeout occurs, then cancel the operation.
if (read_result.wait_for(std::chrono::seconds(1)) == 
    std::future_status::timeout)
{
  socket.cancel();
}
// Otherwise, the operation completed (with success or error).
else
{
  // If the operation failed, then on_read.get() will throw a
  // boost::system::system_error.
  auto bytes_transferred = read_result.get();
  // process buffer
}
Tanner Sansbury
  • 51,153
  • 9
  • 112
  • 169
  • Doesn't the ``cancel()`` methods prevent the outstanding handlers from being called? – yeyeyerman Jun 04 '12 at 09:40
  • No, [`cancel()`](http://www.boost.org/doc/libs/1_49_0/doc/html/boost_asio/reference/basic_deadline_timer/cancel/overload1.html) forces the completion of any outstanding handlers with an _error_ of `boost::asio::error::operation_aborted`. Handlers are only removed from a service in the service's destructor. Also, `cancel()` will not affect the case where both `handle_timeout()` and `handle_read()` succeed without error, such as when the timeout occurs immediately after data is received on the socket. – Tanner Sansbury Jun 04 '12 at 12:12
  • If the handlers are being executed sequentally, is there any need for the lock/unlock clauses? – yeyeyerman Jun 05 '12 at 15:57
  • Based on the posted code, it is safe to remove the lock and unlocks. However, be very cautious. Member variables, such as `_error` and `_bytes_transferred` are subject to concurrency issues from other member functions in `communications_client` or derived classes due to their `protected` accessibility. Consider reducing access with `private` or refactoring the desired behavior into an `async_read_timeout()` free function that mimics `async_read` concepts. – Tanner Sansbury Jun 05 '12 at 19:11
  • @TannerSansbury do you know if the above would equally apply to a UDP socket. I'm writing a tftp client and server and I need timeout support when reading from a remote endpoint. Also if the timeout occurs, I do not want to close the socket as I will reuse it to communicate with the remote endpoint to tell it to resend its packet (using the TFTP ACK packet). Also can you point to a full communications_client class - including the constructors etc, – johnco3 May 13 '15 at 17:42
  • 1
    @johnco3 This form of timeout is applicable to all Boost.Asio I/O objects (sockets, serial, timers, etc.). – Tanner Sansbury May 14 '15 at 14:39