1

I am writing a UDP server app with Boost that should listen on a socket for 5 seconds and if no datagram has been received within these 5 seconds, move on to do other things.

Inspired by some answers I decided to try the solution based on std::future.

The problem is that the call to wait_for() always times out as if no data was received. But if I set a breakpoint on the line that executes after the timeout and that I inspect variables I see that the buffer contains the received datagram and the remote_endpoint object contains the address of the client. In other words the socket receive works as expected but the std::future does not fire. why?

Here is my test server code:

#include <future>
#include <boost/asio.hpp>
#include <boost/asio/use_future.hpp>

using boost::asio::ip::udp;

int main()
{
    try
    {
        boost::asio::io_service io_service;
        udp::socket socket(io_service, udp::endpoint(udp::v4(), 10000));
        char recv_buf[8];

        for (;;)
        {
            ZeroMemory(recv_buf, 8);
            udp::endpoint remote_endpoint;
            std::future<std::size_t> recv_length;

            recv_length = socket.async_receive_from(
                boost::asio::buffer(recv_buf), 
                remote_endpoint, 
                0, 
                boost::asio::use_future);

            if (recv_length.wait_for(
                std::chrono::seconds(5)) == std::future_status::timeout)
            {
                printf("time out. Nothing received.\n");
            }
            else
            {
                printf("received something: %s\n", recv_buf);
            }
        }
    }
    catch (std::exception& e)
    {
        printf("Error: %s\n", e.what());
    }
    return 0;
}

I have been banging my head on this one for a while so any help would be appreciated. I am on Windows 10 with Visual Studio 2015.

Here is my test client code (in python, sorry).

import socket
import time

HOST = "server"           # The remote host
PORT = 10000              # The same port as used by the server
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
    address = socket.getaddrinfo(HOST, PORT)[0][-1]

    while True:
        s.sendto("ping\0", address)
        time.sleep(1)
kenba
  • 4,303
  • 1
  • 23
  • 40
jeancf
  • 33
  • 4
  • The linked question and answer mention needing to run the `io_service`: "as the calling thread will be blocked waiting for the future, at least one other thread must be processing the `io_service` to allow the async [...] operation to progress and fulfill the promise." – Tanner Sansbury Nov 24 '16 at 13:18

4 Answers4

3

You are not calling io_service object's run method. Therefore asio is not running. Please create a thread that calls the run method and then try again.

jrbedard
  • 3,662
  • 5
  • 30
  • 34
Orçun Çolak
  • 182
  • 1
  • 1
  • 7
0

What you are doing is a mix of asychronous and synchronous operations, which is not working:

  • You are using the asynchronous async_receive_from operation, which will run on an asio event loop (io_service) and finish whenever something was received. In the normal case this will call a callback on finish, if you give it the future it will complete the future. Note that this will happen in the thread which calls io_service.run()
  • You are using the future in a synchronous way. This will block the current thread until the future will be fulfilled.
  • If the future would be fulfilled from the same thread than the one you are blocking in order to wait for it it obviously can never be fulfilled.

Possible steps to resolve this:

  • Just use the blocking operations of asio with timeouts. These do exactly the kind of things that you want to achieve with the future.
  • Use the futures then() method in order to attach a continuation instead of blocking on it. Will not work with old stdlib futures, since it is a C++17 extension. However boost futures can do it. You still need to call io_service.run() on the main thread and split your program into the phases before and after the callback.
  • Run asio and it's event loop in a background thread if you require
Matthias247
  • 9,836
  • 1
  • 20
  • 29
  • Could you please expand upon how one would use Asio's blocking operations with timeouts. I was under the impression that timeouts necessitate asynchronous operations. – Tanner Sansbury Nov 24 '16 at 13:31
  • I guessed there was an overload which took a timeout but that doesn't seem to be the case. But there are other options. E.g. you can set the read/write timeout options directly on the socket. Then the blocking operations will utilize that. That's described in the accepted answer here: http://stackoverflow.com/questions/291871/how-to-set-a-timeout-on-blocking-sockets-in-boost-asio – Matthias247 Nov 24 '16 at 13:43
  • Alternatively boost itself shows a cross plattform approach for blocking reads which however uses the async options and io_service.run_one() under the hood: http://www.boost.org/doc/libs/1_52_0/doc/html/boost_asio/example/timeouts/blocking_tcp_client.cpp That looks sane, but I wouldn't use it if you also have asynchronous operations running on the same io_service (because then the handlers for these might be called during the blocking wait). – Matthias247 Nov 24 '16 at 13:45
  • The `SO_RCVTIMEO` and `SO_SNDTIMEO` socket options are not effective with synchronous Asio operstions. See [here](https://stackoverflow.com/questions/30410265). – Tanner Sansbury Nov 24 '16 at 14:29
  • That question is about using these with async operations, not with the synchronous ones which should be simple wrappers around the OS read/write calls. – Matthias247 Nov 24 '16 at 15:44
  • The answer to that question details why socket timeout options are not effective for both asynchronous or synchronous Asio operations. The synchronous operations are not simple wrappers around the OS: if the synchronous Asio read operation is initiated on a native blocking socket, and the `SO_RCVTIMEO` timeout occurs for the system call, then Asio will block the calling thread polling on the file descriptor to be readable. – Tanner Sansbury Nov 24 '16 at 16:30
0

For an asynchronous operation, the underlying I/O and the execution of a completion handler are discrete steps. In this case, the I/O has completed, but the user code never runs the io_service, so the completion handler that would set recv_length's value is never executed. To resolve this, run the io_service.


There are a few details contributing to the observation:

  • when an asynchronous operation is initiated, if it can complete without blocking, then it will do so and its completion handler will be queued into io_service as-if by io_service.post()
  • when using boost::asio::use_future, the std::future's value is set within the asynchronous operation's completion handler
  • handlers posted to the io_service are only invoked in threads that are currently invoking the poll(), poll_one(), run(), and run_one() member functions on the io_service

In context of the question, when

recv_length = socket.async_receive_from(
  boost::asio::buffer(recv_buf), 
  remote_endpoint, 
  0, 
  boost::asio::use_future);

is initiated and data is available to be read (socket.available() > 0), then both remote_endpoint and recv_buffer will be populated with the correct data within the initiating async_receive_from() function. The completion handler that will set recv_length's value is posted to the io_service. However, as the code does not process the io_service, the recv_length's value is never set. Hence, the recv_length.wait_for() will always result in a timeout status.


The official futures example creates an additional thread that is dedicated to processing the I/O service and waits on std::future from within a thread that is not processing the I/O service:

// We run the io_service off in its own thread so that it operates
// completely asynchronously with respect to the rest of the program.
boost::asio::io_service io_service;
boost::asio::io_service::work work(io_service);
std::thread thread([&io_service](){ io_service.run(); });

...

std::future<std::size_t> send_length =
  socket.async_send_to(..., boost::asio::use_future);

// Do other things here while the send completes.

send_length.get(); // Blocks until the send is complete. Throws any errors.

io_service.stop();
thread.join();
Tanner Sansbury
  • 51,153
  • 9
  • 112
  • 169
  • Very nice explanation. I wish this was the solution. I added the code to create the thread and start the io_service at the beginning, as well as the `stop()` and `join()` at the end but the behavior is still the same as described originally. – jeancf Nov 25 '16 at 21:07
  • Yes I did. I also tried to use `recv_length.get()` and `recv_length.wait()` and both work as expected: block until data is received. it is only `wait_for()` that misbehaves. – jeancf Nov 25 '16 at 21:53
  • @jeancf Your code may be invoking undefined behavior, as the code fails to meet the requirement that both the buffer and endpoint must remain valid until `async_receive_from()`'s completion handler is called. This [example](http://coliru.stacked-crooked.com/a/4290cf5390fc3df3) demonstrates proper usage. Furthermore, if `get()` and `wait()` work as expected, then I would verify that `wait_for()` is properly supported in your environment (at one point, Windows had a bug with the return status for `wait_for()`). – Tanner Sansbury Nov 25 '16 at 22:32
0

I found the solution. So to wrap this up, here is what needs to be done. My initial code needs 2 modifications.

(1) adding 2 lines at the beginning to launch a separate thread with io_service to monitor the time-out (as suggested by Tanner Sansbury)

boost::asio::io_service::work work(io_service);
std::thread thread([&io_service](){ io_service.run(); });

(2) call socket.cancel(); in the condition of socket time_out. If the socket operation is not cancelled, the socket will keep blocking despite the renewed calls to wait_for() (solution received on Boost's mailing list).

Here is the amended code for reference:

#include <future>
#include <boost/asio.hpp>
#include <boost/asio/use_future.hpp>

using boost::asio::ip::udp;

int main()
{
    try
    {
        boost::asio::io_service io_service;
        boost::asio::io_service::work work(io_service);
        std::thread thread([&io_service](){ io_service.run(); });

        udp::socket socket(io_service, udp::endpoint(udp::v4(), 10000));

        char recv_buf[8];

        for (;;)
        {
            ZeroMemory(recv_buf, 8);
            udp::endpoint remote_endpoint;
            std::future<std::size_t> recv_length;

            recv_length = socket.async_receive_from(
                boost::asio::buffer(recv_buf), 
                remote_endpoint, 
                0, 
                boost::asio::use_future);

            if (recv_length.wait_for(
                std::chrono::seconds(5)) == std::future_status::timeout)
            {
                printf("time out. Nothing received.\n");
                socket.cancel();
            }
            else
            {
                printf("received something: %s\n", recv_buf);
            }
        }
    }
    catch (std::exception& e)
    {
        printf("Error: %s\n", e.what());
    }
    return 0;
}

Thank you all for your help.

jeancf
  • 33
  • 4