2

Yes. I know there have been a few questions around this time_out in boost::asio. My problem might to too simple for the asio guys to solve here.

I am using boost::asio on TCP protocol to read data over a network continuously in a loop as fast as I can.

Following function ReadData() gets called continuously from a worker std::thread in a while loop.

std::size_t ReadData(std::vector<unsigned char> & buffer, unsigned int size_to_read) {

 boost::system::error_code error_code;
 buffer.resize(size_to_read);

 // Receive body
 std::size_t bytes_read = boost::asio::read(*m_socket, boost::asio::buffer(buffer), error_code);

 if (bytes_read == 0) {
   // log error
   return;
 }

 return bytes_read;
}

It works fine. Returns the data. All is well.

All I want, is to use a time_out for the boost::asio::read. I learnt that I need to use boost::asio::async_read with boost::asio::async_wait for the time_out technique to work.

One boost example suggests to use boost::asio::async_read_until ?

Should I use boost::asio::async_read or boost::asio::async_read_until ?

It does not matter whether I use boost::asio::async_read or boost::asio::async_read_until or boost::asio::read. But I want the asio::read call to be triggered & done within the call to my method ReadData so that the client code does not get affected.

How can I achieve this ? Please suggest

TheWaterProgrammer
  • 7,055
  • 12
  • 70
  • 159
  • You know you can use `socket::cancel()` to cancel an async operation, right? – The Quantum Physicist Apr 03 '17 at 19:55
  • yes. I know that I should cancel the socket if the time_out is reached. But how do I employ a time_out in the async read in the first place? – TheWaterProgrammer Apr 03 '17 at 19:57
  • @TheQuantumPhysicist. Will `socket::cancel()` work on a synchronous read operation ? – TheWaterProgrammer Apr 03 '17 at 20:02
  • 1
    @SegmentationFault start the async_wait and the async_read at the same time. first one to finish should cancel the other. You'll need a flag to indicate that you have cancelled and you'll want to use a strand to manage thread contention. – Richard Hodges Apr 03 '17 at 20:03
  • I never had to use an internally available time out. But you have to keep in mind that async requires typically multithreading. You can run io_service in another thread, and then start a timer in the main there, and when the timer is up, you cancel. Timers are part of C++11. – The Quantum Physicist Apr 03 '17 at 20:03
  • @TheQuantumPhysicist no, you'd run the two async operations on the same io service. there is not necessarily any need for more than one thread. – Richard Hodges Apr 03 '17 at 20:04
  • You have absolutely no hope in canceling a sync operation. Seriously, get it out of your head. We all had this dream when we were beginners. – The Quantum Physicist Apr 03 '17 at 20:04
  • @Richard maybe. I haven't thought it through. I'm though a big fan of running io_service in its own thread. – The Quantum Physicist Apr 03 '17 at 20:05
  • He does not need another thread or a strand. He can just run the `io_service` in the same thread, then reset it when it's done. Not pretty but does not make any difference since OP is not using async operations anyway. – sbabbi Apr 03 '17 at 20:12
  • @TheQuantumPhysicist @Richard Is it possible to get a bare minimum sample demonstrating what you guys are saying ? I need to know how I can change my method `ReadData` to achieve this time_out behavior. why does `boost::asio` a simple time_out parameter in the simple `boost::asio::read` call ? – TheWaterProgrammer Apr 03 '17 at 20:13
  • I'm sorry, I'm responding from my smartphone and gotta go sleep. I'll try to do it tomorrow maybe. – The Quantum Physicist Apr 03 '17 at 20:20
  • @TheQuantumPhysicist Please give it try whenever you get some time. thanks in advance :) – TheWaterProgrammer Apr 03 '17 at 20:22
  • @sbabbi no one mentioned strands, man. If io_service is running on one thread (doesn't have to be the main thread), then definitely strands are not needed. – The Quantum Physicist Apr 03 '17 at 20:22
  • just to create a time_out on a read operation, I need to go to the level of using strands ? couldn't it be simpler than that ? – TheWaterProgrammer Apr 03 '17 at 20:23
  • As I mentioned guys, My `ReadData` call is already in a worker thread. not sure if that helps or complicates the potential solution – TheWaterProgrammer Apr 03 '17 at 20:25
  • 1
    @TheQuantumPhysicist answer posted. recommend you check the asio docs for each of the techniques i have used here. asio is deep, and the documentation is extremely terse. – Richard Hodges Apr 03 '17 at 20:52

1 Answers1

4

OK, something like this should suit your purpose:

Rationale:

You appear to want to use blocking operations. Since that is the case, there is a good chance that you're not running a thread to pump the io loop.

So we kick off two simultaneous async tasks on the socket's io loop and then spawn a thread to:

a) reset (actually restart) the loop in case it's already been exhausted

b) run the loop to exhaustion (we could be cleverer here and only run it until the handler has indicated that some condition has been met, but that's a lesson for another day)

#include <type_traits>

template<class Stream, class ConstBufferSequence, class Handler>
auto async_read_with_timeout(Stream& stream, ConstBufferSequence&& sequence, std::size_t millis, Handler&& handler)
{
    using handler_type = std::decay_t<Handler>;
    using buffer_sequence_type = std::decay_t<ConstBufferSequence>;
    using stream_type = Stream;

    struct state_machine : std::enable_shared_from_this<state_machine>
    {
        state_machine(stream_type& stream, buffer_sequence_type sequence, handler_type handler)
                : stream_(stream)
                , sequence_(std::move(sequence))
                , handler_(std::move(handler))
        {}
        void start(std::size_t millis)
        {
            timer_.expires_from_now(boost::posix_time::milliseconds(millis));
            timer_.async_wait(strand_.wrap([self = this->shared_from_this()](auto&& ec) {
                self->handle_timeout(ec);
            }));
            boost::asio::async_read(stream_, sequence_,
                                    strand_.wrap([self = this->shared_from_this()](auto&& ec, auto size){
                self->handle_read(ec, size);
            }));
        }

        void handle_timeout(boost::system::error_code const& ec)
        {
            if (not ec and not completed_)
            {
                boost::system::error_code sink;
                stream_.cancel(sink);
            }
        }

        void handle_read(boost::system::error_code const& ec, std::size_t size)
        {
            assert(not completed_);
            boost::system::error_code sink;
            timer_.cancel(sink);
            completed_ = true;
            handler_(ec, size);
        }

        stream_type& stream_;
        buffer_sequence_type sequence_;
        handler_type handler_;
        boost::asio::io_service::strand strand_ { stream_.get_io_service() };
        boost::asio::deadline_timer timer_ { stream_.get_io_service() };
        bool completed_ = false;
    };

    auto psm = std::make_shared<state_machine>(stream,
                                               std::forward<ConstBufferSequence>(sequence),
                                               std::forward<Handler>(handler));
    psm->start(millis);
}

std::size_t ReadData(boost::asio::ip::tcp::socket& socket,
                     std::vector<unsigned char> & buffer,
                     unsigned int size_to_read,
                     boost::system::error_code& ec) {

    buffer.resize(size_to_read);

    ec.clear();
    std::size_t bytes_read = 0;
    auto& executor = socket.get_io_service();
    async_read_with_timeout(socket, boost::asio::buffer(buffer),
                            2000, // 2 seconds for example
                            [&](auto&& err, auto size){
        ec = err;
        bytes_read = size;
    });

    // todo: use a more scalable executor than spawning threads
    auto future = std::async(std::launch::async, [&] {
        if (executor.stopped()) {
            executor.reset();
        }
        executor.run();
    });
    future.wait();

    return bytes_read;
}
Richard Hodges
  • 68,278
  • 7
  • 90
  • 142
  • this looks very close to what I need. Yes you are correct. My read method is running in a loop in a worker thread (which is `std::thread`). However I am not familiar with strands. I've to read that up. – TheWaterProgrammer Apr 04 '17 at 07:19
  • what is 'stream_type' here ? error says member reference base type 'stream_type' (aka 'int (int, int, int)') is not a structure or union boost::asio::io_service::strand strand_ { stream_.get_io_service() }; – TheWaterProgrammer Apr 04 '17 at 07:21
  • can u pls check just once ? may be there is a small mistake in the answer ? can I call `cancel` on `stream` like this `stream_.cancel(sink)` ? cancel should be called on the socket. right ? – TheWaterProgrammer Apr 04 '17 at 07:24
  • 1
    @SegmentationFault stream_type is an alias for the socket/stream passed in as an argument – Richard Hodges Apr 04 '17 at 07:25
  • 1
    @SegmentationFault when coding with asio it's better to think of sockets as "streams that just happen to be sockets" – Richard Hodges Apr 04 '17 at 07:25
  • @SegmentationFault note: code is c++14 you may need some gerrymandering to get it compile for c++11 – Richard Hodges Apr 04 '17 at 07:26
  • 1
    @SegmentationFault one last thing. note that I am using the free function asio::read - if this completes with an error, it is still possible that some bytes have been read. usually it's not important as you are looking for a fixed-length frame, but it's something that caught me out once. It's because asio::read is a composed operation (see docs). Best of luck. – Richard Hodges Apr 04 '17 at 07:29
  • thanks a lot for getting me till here. I tested out your code. it is reading the bytes but not the full data that I need. I could get only 33 bytes of data in every read compared to 160062 bytes of data that `asio::read` fetches me normally. I need to tweak with your code to get the full data ? may be use `async_read_until` ? – TheWaterProgrammer Apr 04 '17 at 07:35
  • accepted your answer Richard. whether to use `async_read` or `async_read_until` is separate question i should ask. thanks till here – TheWaterProgrammer Apr 04 '17 at 07:37
  • 1
    @SegmentationFault it should read as many bytes as you have made available in the vector. However, the timeout is 2 seconds. perhaps you need longer? – Richard Hodges Apr 04 '17 at 07:45
  • little strange but I changed `2000` to `20000`. it started working. looks like `2000` doesnt mean 2 seconds ? the data I trying to fetch with every read takes only 50 to 70 millisecs to get complete pack. but good news is that the whole thing now works :) – TheWaterProgrammer Apr 04 '17 at 08:06
  • again my mistake with the time out thing. everything works. thanks – TheWaterProgrammer Apr 04 '17 at 11:12
  • one last point. you have done a `#include `. I am well able to compile & run the code without that include. – TheWaterProgrammer Apr 04 '17 at 17:51
  • 1
    @SegmentationFault no problem. probably something else you have included includes it somewhere down the chain - boost/asio is a likely candidate. Glad to have helped. – Richard Hodges Apr 04 '17 at 18:08
  • I am trying to do the same for a timeout write posted the question [here](http://stackoverflow.com/questions/43283029/boostasiowrite-with-a-timeout) Pls reply if you could – TheWaterProgrammer Apr 07 '17 at 16:47