0

I'm working on a RS485 communication class and I'm trying to make a function that reads until a certain char is on the line, but with a time out. The problem is that my system timer immediately returns, doesn't matter which time out I enter. I tried changing the timer to be a member variable of the class, so it doesn't go out of scope, but that wasn't the problem. I tried different implementations of timers (deadline_timer mostly) but that didn't help. If I remove the timer from the code, then the read succeeds, but when I add it, even if I give it a timeout of 10 seconds (which should be waay more than enough), it will respond with an immediate timeout.

I tried making a simple version of the class here, but I guess that the options mostly depend on the type of machine you're talking to:

class RS485CommunicationLayer final {
public:
    RS485CommunicationLayer(
            const std::string& path,
            /* options */
    ): io(), port(io), timer(port.get_io_service()) {
        open(/* options */);
    };

    std::size_t write(const char* const buffer, const size_t size) {
        /*impl*/
    }
// THIS FUNCTION --v
    void readUntil(std::vector<char>& buffer, char delim,std::chrono::microseconds timeout) {
        boost::optional<boost::system::error_code> timer_result;
        boost::optional<boost::system::error_code> read_result;
        
        port.get_io_service().reset();
        
        timer.expires_from_now(timeout);
        boost::asio::async_read_until(port,  asio::dynamic_buffer(buffer), delim, [&read_result] (const boost::system::error_code& error, size_t) { read_result.reset(error); });
        timer.async_wait([&timer_result] (const boost::system::error_code& error) { timer_result.reset(error); });
    
        
        while (port.get_io_service().run_one())
        { 
            if (read_result)
                timer.cancel();
            else if (timer_result) {
                port.cancel();
            }
        }
    
        if (read_result)
            throw boost::system::system_error(*read_result);
    };

private:
    asio::io_context io;
    asio::serial_port port;
    boost::asio::system_timer timer;

    void open(/*args*/) {
       port.open(path);
       /*set options*/
    }
};

Edit: I also tried the following implementation after finding out that run_for() exists. But then the buffer stays empty weirdly enough.

void RS485CommunicationLayer::readUntil(std::vector<char>& buffer, char delim, std::chrono::microseconds timeout) {
    boost::optional<boost::system::error_code> read_result;
    
    boost::asio::async_read_until(port,  asio::dynamic_buffer(buffer), delim, [&read_result] (const boost::system::error_code& error, size_t) { read_result.reset(error); });

    port.get_io_service().run_for(timeout);
    if (read_result)
        throw boost::system::system_error(*read_result);
}
Typhaon
  • 828
  • 8
  • 27

1 Answers1

1

First off, get_io_service() indicates a Very Old(TM) boost version. Also, it just returns io.

Secondly, why so complicated? I don't even really have the energy to see whether there is a subtle problem with the run_one() loop (it looks fine at a glance).

I'd simplify:

size_t readUntil(std::vector<char>& buffer, char delim,
                 std::chrono::microseconds timeout) {
    error_code read_result;
    size_t     msglen = 0;

    io.reset();
    asio::system_timer timer(io, timeout);
    asio::async_read_until(port, asio::dynamic_buffer(buffer), delim,
                           [&](error_code ec, size_t n) {
                               timer.cancel();
                               read_result = ec;
                               msglen      = n;
                           });
    timer.async_wait([&](error_code ec) { if (!ec) port.cancel(); });
    io.run();

    if (read_result)
        boost::throw_with_location(boost::system::system_error(read_result),
                                   read_result.location());

    return msglen;
}

You can just cancel the complementary IO object from the respective completion handlers.

The timer is per-op and local to the readUntil, so it doesn't have to be a member.

Let's also throw in the write side, which is all of:

size_t write(char const* const data, const size_t size) {
    return asio::write(port, asio::buffer(data, size));
}

And I can demo it working:

Live On Coliru

#include <boost/asio.hpp>
#include <iomanip>
#include <iostream>
namespace asio = boost::asio;
using boost::system::error_code;
using namespace std::chrono_literals;

class RS485CommunicationLayer final {
  public:
    RS485CommunicationLayer(std::string const& path) : io(), port(io) { open(path); };

    size_t write(char const* const data, const size_t size) {
        return asio::write(port, asio::buffer(data, size));
    }

    size_t readUntil(std::vector<char>& buffer, char delim,
                     std::chrono::microseconds timeout) {
        error_code read_result;
        size_t     msglen = 0;

        io.reset();
        asio::system_timer timer(io, timeout);
        asio::async_read_until(port, asio::dynamic_buffer(buffer), delim,
                               [&](error_code ec, size_t n) {
                                   timer.cancel();
                                   read_result = ec;
                                   msglen      = n;
                               });
        timer.async_wait([&](error_code ec) { if (!ec) port.cancel(); });
        io.run();

        if (read_result)
            boost::throw_with_location(boost::system::system_error(read_result),
                                       read_result.location());

        return msglen;
    }

  private:
    asio::io_context  io;
    asio::serial_port port;

    void open(std::string path) {
        port.open(path);
        /*set options*/
    }

    void close();
};

int main(int argc, char** argv) {
    RS485CommunicationLayer comm(argc > 1 ? argv[1] : "");

    comm.write("Hello world\n", 12);
    for (std::vector<char> response_buffer;
         auto              len = comm.readUntil(response_buffer, '\n', 100ms);) //
    {
        std::cout << "Received " << response_buffer.size() << " bytes, next "
                  << quoted(std::string_view(response_buffer.data(), len - 1))
                  << std::endl;

        // consume
        response_buffer.erase(begin(response_buffer), begin(response_buffer) + len);
    }
}

Demo locally with a socat PTS tunnel:

socat -d -d pty,raw,echo=0 pty,raw,echo=0

And throwing dictionaries at the other end:

while true; do cat /etc/dictionaries-common/words ; done | pv > /dev/pts/10

enter image description here

sehe
  • 374,641
  • 47
  • 450
  • 633
  • Side notes: Consider making `readUntil` hide the buffer internally (where it belongs, because it affects subsequent reads). Also I would consider not "abusing" `reset()` and `run()` in this way, e.g. using a packaged_task or promise. (An example I made yesterday https://stackoverflow.com/a/74791925/85371) – sehe Dec 15 '22 at 17:47