1

My callback for async_write_some is not called after a one second sleep. If I am starting an io_service worker thread for every write, why is the callback not being called?

header

boost::system::error_code error_1;
boost::shared_ptr <boost::asio::io_service> io_service_1;
boost::shared_ptr <boost::asio::ip::tcp::socket> socket_1;

connect

void eth_socket::open_eth_socket (void)
{
    // 1. reset io services
    io_service_1.reset();
    io_service_1 = boost::make_shared <boost::asio::io_service> ();

    // 2. create endpoint
    boost::asio::ip::tcp::endpoint remote_endpoint(
        boost::asio::ip::address::from_string("10.0.0.3"), 
        socket_1_port
    );

    // 3. reset socket
    socket_1.reset(new boost::asio::ip::tcp::socket(*io_service_1));                

    // 4. connect socket
    socket_1->async_connect(remote_endpoint,
        boost::bind(
            &eth_socket::socket_1_connect_callback,
            this, boost::asio::placeholders::error
        )
    );

    // 5. start io_service_1 run thread after giving it work
    boost::thread t(boost::bind(&boost::asio::io_service::run, *&io_service_1));                
    return;
}

write

void eth_socket::write_data (std::string data)
{   
    // 1. check socket status
    if (!socket_1->is_open())
    {
        WARNING << "socket_1 is not open";
        throw -3;
    }

    // 2. start asynchronous write
    socket_1->async_write_some(
        boost::asio::buffer(data.c_str(), data.size()),
        boost::bind(
            &eth_socket::socket_1_write_data_callback,
            this, boost::asio::placeholders::error, 
            boost::asio::placeholders::bytes_transferred
        )
    );

    // 3. start io_service_1 run thread after giving it work
    boost::thread t(boost::bind(&boost::asio::io_service::run, *&io_service_1));
    return;
}

callback

void eth_socket::socket_1_write_data_callback (const boost::system::error_code& error, size_t bytes_transferred)
{
    // 1. check for errors
    if (error) 
    {
        ERROR << "error.message() >> " << error.message().c_str();
        return;
    }
    if (socket_1.get() == NULL || !socket_1->is_open())
    {
        WARNING << "serial_port_1 is not open";
        return;
    }
    INFO << "data written to 10.0.0.3:1337 succeeded; bytes_transferred = " << bytes_transferred;
    return;
}

test

open_eth_socket();
write_data("Hello");    // callback called
write_data("Hello");    // callback called
write_data("Hello");    // callback called
sleep(1);
write_data("Hello");    // callback not called after sleep
xinthose
  • 3,213
  • 3
  • 40
  • 59

2 Answers2

1
boost::thread t(boost::bind(&boost::asio::io_service::run, *&io_service_1));                

That's weird for a number of reasons.

I'd add to these top-level concerns

  • the smell from using names like socket_1 (just call it socket_ and instantiate another object with a descriptive name to contain the other socket_). I'm not sure, but the question does raise suspicion these might even be global variables. (I hope that's not the case)
  • throw-ing raw integers, really?
  • You are risking full on data-races by destructing io_service while never checking that worker threads had completed.
  • More Undefined Behaviour here:

    _sock.async_write_some(
            ba::buffer(data.c_str(), data.size()),
    

    You pass a reference to the parameter data which goes out of scope. When the async operation completes, it will be a dangling reference

  • There's some obvious copy/paste trouble going on here:

    if (socket_1.get() == NULL || !socket_1->is_open())
    {
        WARNING << "serial_port_1 is not open";
        return;
    }
    

    I'd actually say this stems from precisely the same source that lead to the variable names being serial_port_1 and socket_1

Some Cleanup

Simplify. There wasn't self-contained code, so nothing complete here, but at least see the many points of simplification:

Live On Coliru

#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <iostream>

namespace ba = boost::asio;
using ba::ip::tcp;
using boost::system::error_code;

#define ERROR   std::cerr
#define WARNING std::cerr
#define INFO    std::cerr

struct eth_socket {

    ~eth_socket() {
        _work.reset();
        if (_worker.joinable())
            _worker.join(); // wait
    }

    void open(std::string address);
    void write_data(std::string data);

  private:
    void connected(error_code error) {
        if (error)
            ERROR << "Connect failed: " << error << "\n";
        else
            INFO << "Connected to " << _sock.remote_endpoint() << "\n";
    }
    void written(error_code error, size_t bytes_transferred);

  private:
    ba::io_service _svc;
    boost::optional<ba::io_service::work> _work{ _svc };
    boost::thread _worker{ [this] { _svc.run(); } };

    std::string _data;

    unsigned short _port = 6767;
    tcp::socket _sock{ _svc };
};

void eth_socket::open(std::string address) {
    tcp::endpoint remote_endpoint(ba::ip::address::from_string(address), _port);

    _sock.async_connect(remote_endpoint, boost::bind(&eth_socket::connected, this, _1));
}

void eth_socket::write_data(std::string data) {
    _data = data;

    _sock.async_write_some(ba::buffer(_data), boost::bind(&eth_socket::written, this, _1, _2));
}

void eth_socket::written(error_code error, size_t bytes_transferred) {
    INFO << "data written to " << _sock.remote_endpoint() << " " << error.message() << ";"
         << "bytes_transferred = " << bytes_transferred << "\n";
}

int main() {
    {
        eth_socket s;
        s.open("127.0.0.1");

        s.write_data("Hello"); // callback called
        s.write_data("Hello"); // callback called
        s.write_data("Hello"); // callback called
        boost::this_thread::sleep_for(boost::chrono::seconds(1));
        s.write_data("Hello"); // callback not called after sleep

    } // orderly worker thread join here
}
sehe
  • 374,641
  • 47
  • 450
  • 633
  • Note this leaves the problem with data-race when `write_data` overlaps a prior read. I'll link to the common solution for that later. – sehe Nov 16 '17 at 18:09
  • Thank you. I figured my problem was with the `io_service` and thread. So the `_work` thread is always running? In the callback I should have `io_service_1->reset();` and start the `io_service` work thread after I connect? I thought the work thread runs out of work on the callback, therefore I need to start it for each write. – xinthose Nov 16 '17 at 20:07
  • ... Please just read the sample code/ I didn't write up the sample for no reason. Extra background link perhaps in case you missed it: [Why do we need to use `boos::asio::io_service::work`](https://stackoverflow.com/questions/17156541/why-do-we-need-to-use-boostasioio-servicework) (it refers to the very first bullet, though) – sehe Nov 16 '17 at 22:14
  • 1
    Oh. I didn't see this before, but `*&io_service_1` is definitely wrong. It's a tautology. You may have (uselessly) meant `&*io_service_`. But `boost::bind` supports `shared_ptr<>` just fine, directly — making it a hell of a lot more safe too) – sehe Nov 16 '17 at 22:26
0

My problems are now fixed thanks to sehe's help and prayer.

This line in open_eth_socket:

boost::thread t(boost::bind(&boost::asio::io_service::run, *&io_service_1));

is now this:

boost::shared_ptr <boost::thread>  io_service_1_thread;    // in header

if (io_service_1_thread.get()) io_service_1_thread->interrupt();
io_service_1_thread.reset(new boost::thread (boost::bind(&eth_socket::run_io_service_1, this)));

I added this function:

void eth_socket::run_io_service_1 (void)
{
  while (true)  // work forever
  {
    boost::asio::io_service::work work(*io_service_1);
    io_service_1->run();
    io_service_1->reset();   // not sure if this will cause problems yet
    INFO << "io_service_1 run complete";
    boost::this_thread::sleep (boost::posix_time::milliseconds (100));
  }

    return;
}
xinthose
  • 3,213
  • 3
  • 40
  • 59