2

Current Scheme

I am developing a Serial Port routine that will regard current receive transfer is complete if no new data is received for 25 milli-seconds. I start the timer on the first the read_handler (Boost ASIO callback method) call. For every new read_handler call, I cancel the asynchronous operations that are waiting on the timer and create a new asynchronous operations on the timer.

Problem

The problem I am facing is that randomly my receive transfer that was suppose to be 1 transfer is being treated as 2 separate transfer as receive_timeout event (receive_timeout_handler) is being triggered (called) multiple times.

I'm not sure is this because of my incorrect implementation/usage of Boost ASIO system_timer or due to Driver issue in my USB to Serial Converter.
I'm currently using FT4232 module (contains 4 UART/Serial Port) to test my routines whereby I send data from send data (4 K.B. text file) from UART1 and receive data on UART0.
I expect that only after receiving all 4 K.B. of data, the serial port class signal main thread however sometimes this one 4 K.B. transfer is signaled 2-3 times.

Code :

class SerialPort
{
public:
    SerialPort() : io(), port(io), receive_timeout_timer(io)
    bool open_port(void);
    bool read_async(std::int32_t read_timeout = -1)
    void read_handler(const boost::system::error_code& error, std::size_t bytes_transferred);
    void receive_timeout_handler(const boost::system::error_code& error);
private:
    boost::asio::io_context         io;
    boost::asio::serial_port        port;
    boost::asio::system_timer       receive_timeout_timer {25};
    std::array<std::byte, 8096>     read_byte_buffer;
};


bool SerialPort::open_port(void)
{
    try
    {
        this->port.open("COM3");
        return true;
    }
    catch (const std::exception& ex)
    {
    }
    return false;
}

bool SerialPort::read_async(std::uint32_t read_timeout)
{
    try
    {
        this->read_byte_buffer.fill(static_cast<std::byte>(0)); //Clear Buffer
        if (read_timeout not_eq -1)
        {
            this->read_timeout = read_timeout;//If read_timeout is not set to ignore_timeout, update the read_timeout else use old read_timeout
        }
        this->port.async_read_some(
            boost::asio::buffer(
                this->read_byte_buffer.data(),
                this->read_byte_buffer.size()
            ),
            boost::bind(
                &SerialPort::read_handler,
                this,
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred
            )
        );
        return true;
    }
    catch (const std::exception& ex)
    {
        return false;
    }
}

void SerialPort::read_handler(const boost::system::error_code& error, std::size_t bytes_transferred)
{
    std::string temporary_recieve_data;
    try
    {
        if (error not_eq boost::system::errc::success)  //Error in serial port read
        {
            return;
        }
        std::transform(this->read_byte_buffer.begin(), this->read_byte_buffer.begin() + bytes_transferred,
            std::back_inserter(temporary_recieve_data), [](std::byte character) {
                return static_cast<char>(character);
            }
        );
        this->read_async(); //Again Start the read operation
        this->received_data += temporary_recieve_data;
        this->receive_timeout_timer.cancel();   // Cancel existing timers if any are running
        this->receive_timeout_timer.expires_after(boost::asio::chrono::milliseconds(SerialPort::bulk_data_receive_complete));   // Reset timer to current timestamp + 25  milliseconds
        this->receive_timeout_timer.async_wait(boost::bind(&SerialPort::receive_timeout_handler, this, boost::asio::placeholders::error));
    }
    catch (const std::exception& ex)
    {
    }
}

void SerialPort::receive_timeout_handler(const boost::system::error_code& error)
{
    try
    {
        if (error not_eq boost::system::errc::success)  //Error in serial port read
        {
            return;
        }

        // this->signal(this->port_number, SerialPortEvents::read_data, this->received_data); //Signal to main thread that data has been received
    }
    catch (const std::exception& ex)
    {
    }
}

Protocol Block Diagram

Dark Sorrow
  • 1,681
  • 14
  • 37

1 Answers1

1
    read_timer.cancel(); // Cancel existing timers if any are running
    read_timer.expires_after(
        SerialPort::bulk_data_receive_complete); // Reset timer to current timestamp + 25  milliseconds

Here the cancel is redundant, because setting the expiration cancels any pending wait.

You reschedule the timer regardless of whether it ran out. Your code misses the possibility that both the read and timer could have completed successfully. In that case your main gets signaled multiple times, even though it only "nearly" exceeded 25ms idle.

You would expect to see partially duplicated data, then, because received_data isn't cleared.

To clearly see what is going on, build your code with -DBOOST_ASIO_ENABLE_HANDLER_TRACKING=1 and run the output through handler_viz.pl (see also Cancelling boost asio deadline timer safely).

Suggestions

You could probably avoid the double firing by being explicit about the flow:

enter image description here

To achieve that, only cancel the read from the timeout handler:

void SerialPort::receive_timeout_handler(error_code ec) {
    if (!ec.failed()) {
        port.cancel(ec);
        std::cerr << "read canceled: " << ec.message() << std::endl;
    }
}

Then you could move the signal to the read-handler, where you expect the cancellation:

void SerialPort::read_handler(error_code ec, size_t bytes_transferred) {
    if (ec == asio::error::operation_aborted) {
        signal(port_number, SerialPortEvents::read_data, std::move(received_data));
    } else if (ec.failed()) {
        std::cerr << "SerialPort read: " << ec.message() << std::endl;
    } else {
        copy_n(begin(read_buffer), bytes_transferred, back_inserter(received_data));

        read_timer.expires_after(bulk_data_receive_complete); // reset timer
        read_timer.async_wait(boost::bind(&SerialPort::receive_timeout_handler, this, ph::error));
        start_async_read(); // continue reading
    }
}

To be completely fool-proof, you can check that the timer wasn't actually expired even on successful read (see again Cancelling boost asio deadline timer safely).

Intuitively, I think it makes more even sense to schedule the timer from start_async_read.

ASIDE #1

Currently your code completely ignores read_timeout (even aside from the unnecessary confusion between the argument read_timeout and the member read_timeout). It is unclear to me whether you want the read_timeout override argument to "stick" for the entire chain of read operations.

If you want it to stick, change the

start_async_read(bulk_data_receive_complete); // continue reading

call to

start_async_read(); // continue reading

below. I kept it like it is because it allows for easier timing demonstrations

ASIDE #2

I've undone the exception swallowing code. Instead of just squashing all exceptions into a boolean (which you'll then check to change control flow), use the native language feature to change the control flow, retaining error information.

Full Demo

Live On Coliru

#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <boost/signals2.hpp>
#include <iomanip>
#include <iostream>

namespace asio = boost::asio;
namespace ph   = boost::asio::placeholders;
using boost::system::error_code;
using namespace std::chrono_literals;

enum class SerialPortEvents { read_data };

class SerialPort {
    using duration = std::chrono::system_clock::duration;
    static constexpr duration                         //
        ignore_timeout             = duration::min(), // e.g. -0x8000000000000000ns
        bulk_data_receive_complete = 25ms;

  public:
    SerialPort() : io(), port(io), read_timer(io) {}

    void open_port(std::string device);
    void start_async_read(duration read_timeout = ignore_timeout);

    void run() {
        if (io.stopped())
            io.restart();
        io.run();
    }

    boost::signals2::signal<void(unsigned, SerialPortEvents, std::string)> signal;

  private:
    void read_handler(error_code ec, size_t bytes_transferred);
    void receive_timeout_handler(error_code ec);

    duration read_timeout = bulk_data_receive_complete;

    asio::io_context       io;
    asio::serial_port      port;
    asio::system_timer     read_timer;
    std::array<char, 8096> read_buffer;
    std::string            received_data;

    // TODO
    unsigned const port_number = 0;
};

void SerialPort::open_port(std::string device) { port.open(device); }

void SerialPort::start_async_read(duration timeout_override) {
    read_buffer.fill(0); // Clear Buffer (TODO redundant)

    if (timeout_override != ignore_timeout)
        read_timeout = timeout_override;

    std::cerr << "Expiry: " << read_timeout/1.s << "s from now" << std::endl;
    read_timer.expires_after(read_timeout); // reset timer
    read_timer.async_wait(boost::bind(&SerialPort::receive_timeout_handler, this, ph::error));

    port.async_read_some( //
        boost::asio::buffer(read_buffer),
        boost::bind(&SerialPort::read_handler, this, ph::error, ph::bytes_transferred));
}

void SerialPort::read_handler(error_code ec, size_t bytes_transferred) {
    if (ec == asio::error::operation_aborted) {
        signal(port_number, SerialPortEvents::read_data, std::move(received_data));
    } else if (ec.failed()) {
        std::cerr << "SerialPort read: " << ec.message() << std::endl;
    } else {
        copy_n(begin(read_buffer), bytes_transferred, back_inserter(received_data));
        start_async_read(bulk_data_receive_complete); // continue reading
    }
}

void SerialPort::receive_timeout_handler(error_code ec) {
    if (!ec.failed()) {
        port.cancel(ec);
        std::cerr << "read canceled: " << ec.message() << std::endl;
    }
}

int main(int argc, char** argv) {
    SerialPort sp;
    sp.open_port(argc > 1 ? argv[1] : "COM3");

    int count = 0;
    sp.signal.connect([&count](unsigned port, SerialPortEvents event, std::string data) {
        assert(port == 0);
        assert(event == SerialPortEvents::read_data);
        std::cout << "data #" << ++count << ": " << std::quoted(data) << "\n----" << std::endl;
    });

    sp.start_async_read(10s);
    sp.run();

    sp.start_async_read();
    sp.run();
}

Testing with

socat -d -d pty,raw,echo=0 pty,raw,echo=0
./build/sotest /dev/pts/7

And various device emulations:

for a in hello world bye world; do sleep .01; echo  "$a"; done >> /dev/pts/9
for a in hello world bye world; do sleep .025; echo  "$a"; done >> /dev/pts/9
for a in hello world bye world; do sleep 1.0; echo  "$a"; done >> /dev/pts/9
cat /etc/dictionaries-common/words >> /dev/pts/9

You can see all the outputs match with the expectations. With the sleep .025 you can see the input split over two read operations, but never with repeated data.

enter image description here

Handler tracking for the various runs:
1. 2. 3. 4.

The last one (literally throwing the dictionary at it) is way too big to be useful: https://i.stack.imgur.com/zBLJN.jpg

Simplifying Notes

Note that your entire SerialPort re-implements a composed read operation. You might use simplify all that to asio::async_read_until with a MatchCondition.

This has the benefit of allowing directly asio::dynamic_buffer(received_data) as well.

Here's a simpler version that doesn't use a timer, but instead updates the deadline inside the manual run() loop.

It uses a single composed read operation with a MatchCondition that checks when the connection is "idle".

Live On Coliru

#include <boost/asio.hpp>
#include <iomanip>
#include <iostream>

namespace asio = boost::asio;
using namespace std::chrono_literals;

enum class SerialPortEvents { read_data };

class SerialPort {
    using Clock    = std::chrono::system_clock;
    using Duration = Clock::duration;

    static constexpr Duration default_idle_timeout = 25ms;

  public:
    void open_port(std::string device);
    void read_till_idle(Duration idle_timeout = default_idle_timeout);

    std::function<void(unsigned, SerialPortEvents, std::string)> signal;

  private:
    asio::io_context   io;
    asio::serial_port  port{io};
    std::string        received_data;
};

void SerialPort::open_port(std::string device) { port.open(device); }

namespace {
    // Asio requires nested result_type to be MatchCondition... :(
    template <typename F> struct AsMatchCondition {
        using CBT         = boost::asio::dynamic_string_buffer<char, std::char_traits<char>,
              std::allocator<char>>::const_buffers_type;
        using It          = asio::buffers_iterator<CBT>;
        using result_type = std::pair<It, bool>;

        F _f;
        AsMatchCondition(F f) : _f(std::move(f)) {}

        auto operator()(It f, It l) const { return _f(f, l); }
    };
}

void SerialPort::read_till_idle(Duration idle_timeout) {
    if (io.stopped())
        io.restart();

    using T              = Clock::time_point;
    T    start           = Clock::now();
    auto current_timeout = idle_timeout;
    auto deadline        = T::max();

    auto is_idle = [&](T& new_now) { // atomic w.r.t. a new_now
        new_now = Clock::now();
        return new_now >= deadline;
    };

    auto update = [&](int invocation) {
        auto previous = start;
        bool idle     = is_idle(start);

        if (invocation > 0) {
            current_timeout = default_idle_timeout; // or not, your choice

            std::cerr << " [update deadline for current timeout:" << current_timeout / 1ms << "ms after "
                      << (start - previous) / 1ms << "ms]" << std::endl;
        }

        deadline = start + current_timeout;
        return idle;
    };

    int  invocation = 0; // to avoid updating current_timeout on first invocation
    auto condition  = AsMatchCondition([&](auto, auto e) { return std::pair(e, update(invocation++)); });

    async_read_until(port, asio::dynamic_buffer(received_data), condition,
                     [this](auto...) { signal(0, SerialPortEvents::read_data, std::move(received_data)); });

    for (T t; !io.stopped(); io.run_for(5ms))
        if (is_idle(t))
            port.cancel();
}

void data_received(unsigned port, SerialPortEvents event, std::string data) {
    static int count = 0;
    assert(port == 0);
    assert(event == SerialPortEvents::read_data);
    std::cout << "data #" << ++count << ": " << std::quoted(data) << std::endl;
}

int main(int argc, char** argv) {
    SerialPort sp;
    sp.signal = data_received;
    sp.open_port(argc > 1 ? argv[1] : "COM3");

    sp.read_till_idle(3s);
}

Same local demos:

enter image description here

sehe
  • 374,641
  • 47
  • 450
  • 633
  • 1
    I also made a (simpler?) version that doesn't use a timer, but instead updates the deadline inside the manual `run()` loop. It uses a single composed read operation with a `MatchCondition` that checks when the connection is "idle". [Now 98 lines of code](http://coliru.stacked-crooked.com/a/4da3215db382286c), same local demos: https://imgur.com/a/WPVELE9 – sehe Jan 22 '23 at 23:36