3

I have a TCP Client where calling a regular connect on a tcp socket works fine. However, the call to async_connect never fires the handler. They both use almost the same code. The difference is just the call to connect vs async_connect.

Header

#ifndef TCPCLIENT_H
#define TCPCLIENT_H

#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/asio.hpp>

#include <boost/thread/thread.hpp> 
#include <boost/thread/mutex.hpp>

using boost::asio::ip::tcp;

//How about an interface for outputting errors
class BoostTCPDebugOutputInterface
{
public:
    virtual void outputError(std::string& error)=0;
    virtual void outputDebug(std::string& debug) = 0;
    virtual void outputWarning(std::string& warning) = 0;
    virtual void outputInfo(std::string& info) = 0;

};

class ShutdownCallback
{
public:
    virtual void shutdown() = 0;
};

class BoostTCPConnection 
    : public boost::enable_shared_from_this<BoostTCPConnection>
{
public:
    typedef boost::shared_ptr<BoostTCPConnection> Connection_ptr;

    static Connection_ptr create(boost::asio::io_service& io_service, BoostTCPDebugOutputInterface* output, ShutdownCallback* shutdowner)
    {
        return Connection_ptr(new BoostTCPConnection(io_service, output, shutdowner));
    }

    virtual ~BoostTCPConnection(void);

    void start();
    bool isRunning();
    void setRunning(bool running);

    void enqueueMessageToSend(boost::shared_ptr<std::string>& message);

    boost::asio::ip::tcp::socket& socket()
    {
        return socket_;
    }

    void getQueueSize(unsigned long& tcpToSend, unsigned long& tcpRead);

    void getMessages(std::list< boost::shared_ptr<std::string> >& messages);

protected:
    BoostTCPConnection(boost::asio::io_service& io_service, BoostTCPDebugOutputInterface* output, ShutdownCallback* shutdowner) : socket_(io_service), errorOutput_(output), shutdowner_(shutdowner) {}

    void send();
    void handleWrite(const boost::system::error_code& error,size_t bytes_transferred);
    void handleReceive(const boost::system::error_code& error,size_t bytes_transferred);

    void readMessage();
    void handleReceive();

    std::string hostIP_;
    unsigned int hostPort_;

    boost::asio::io_service io_service_;
    boost::asio::ip::tcp::socket socket_;

    boost::mutex runningMutex_;
    bool running_;

    boost::mutex readMutex_;
    std::list< boost::shared_ptr<std::string> > receivedMsgs_;

    boost::mutex sendMutex_;
    std::list< boost::shared_ptr<std::string> > sendMsgs_;

    BoostTCPDebugOutputInterface* errorOutput_;
    ShutdownCallback* shutdowner_;

    static const size_t HEADERSIZE = 4;
};

class TCPClient
{
public:
    TCPClient();
    virtual ~TCPClient();

    bool start(std::string& hostIP,  unsigned int& hostPort, BoostTCPDebugOutputInterface* output, ShutdownCallback* shutdowner);
    void stop();

    void getQueueSize(unsigned long& tcpToSend, unsigned long& tcpRead);

    void send(boost::shared_ptr<std::string>& message);
    void getMessages(std::list< boost::shared_ptr<std::string> >& messages);

protected:
    void threadAction();

    void handleConnect(const boost::system::error_code& error);


    BoostTCPConnection::Connection_ptr connection_;

    boost::thread_group worker_threads_;

    boost::asio::io_service io_service_;

    BoostTCPDebugOutputInterface* errorOutput_;
    ShutdownCallback* shutdowner_;
};

#endif

CPP File

#include "TCPClient.h"

#include <iostream>

BoostTCPConnection::~BoostTCPConnection(void)
{
}

void BoostTCPConnection::start()
{
    setRunning(true);
    while (isRunning())
    {

        bool readData(false);
        bool wroteData(false);

        if (!socket_.is_open())
        {
            std::string info("BoostTCPConnection::start() socket is no longer open.  ");
            errorOutput_->outputError(info);
            shutdowner_->shutdown();
            //Stop this NOW!!!
        }

        //Check if there are 4 bytes for packet size 
        //If there are read the size and then do an read to get the packet
        //The handler function should put the packet on a queue.
        boost::asio::socket_base::bytes_readable command(true);

        socket_.io_control(command);
        std::size_t bytes_readable = command.get();

        if ( bytes_readable >= HEADERSIZE )
        {
            readMessage();
            readData=true;
        }

        size_t sendSize(0);
        {
            boost::mutex::scoped_lock(sendMutex_);
            sendSize = sendMsgs_.size();
        }

        if ( sendSize > 0)
        {
            send();
        }


        if ( !readData && !wroteData )
            boost::this_thread::sleep(boost::posix_time::milliseconds(5)); 
    }
}

void BoostTCPConnection::readMessage()
{
    size_t messageSize(0);

    char temp[4]="";
    std::vector<char> header(4);

    boost::system::error_code ec;

    //Read the header which is the size
    size_t read=boost::asio::read(socket_, boost::asio::buffer(header), ec);
    if (ec)
    {       
        std::string info("BoostTCPConnection::readMessage errorcode ");
        info+=ec.message();
        errorOutput_->outputError(info);
        shutdowner_->shutdown();
        //TODO Signal the GUI to stop
        return;
    }
    memcpy((void*)(&messageSize),(void*)header.data(),4);

    std::vector<char> rcvBuffer(messageSize);

    read=boost::asio::read(socket_, boost::asio::buffer(rcvBuffer),ec);
    if (ec)
    {

        std::string info("BoostTCPConnection::readMessage errorcode ");
        info+=ec.message();
        errorOutput_->outputError(info);
        shutdowner_->shutdown();
        //TODO Signal the GUI to stop
        return;
    }

    rcvBuffer.push_back('\0');

    std::string test(rcvBuffer.begin(),rcvBuffer.end());
    boost::shared_ptr<std::string> message(new std::string(rcvBuffer.begin(),rcvBuffer.end()));

    receivedMsgs_.push_back(message);

}

void BoostTCPConnection::getMessages(std::list< boost::shared_ptr<std::string> >& messages)
{

    if (messages.size() > 0)
        messages.clear();

    {
        boost::mutex::scoped_lock lock(readMutex_); 
        receivedMsgs_.swap(messages);
    }
}

void BoostTCPConnection::handleReceive(const boost::system::error_code& error, size_t bytes_transferred)
{

    if (error)
    {
        std::ostringstream oss;
        oss<< "BoostTCPConnection::handleReceive got an error Code of "<<error.value()<<" and message "<<error.message()<<" bytes_transferred = "<<bytes_transferred<<std::endl;
        errorOutput_->outputError(oss.str());
        shutdowner_->shutdown();
        return;
    }

}


bool BoostTCPConnection::isRunning()
{
    boost::mutex::scoped_lock lock(runningMutex_);
    return running_;
}

void BoostTCPConnection::setRunning(bool running)
{
    boost::mutex::scoped_lock lock(runningMutex_);
    running_=running;
}

void BoostTCPConnection::enqueueMessageToSend(boost::shared_ptr<std::string>& message)
{
    boost::mutex::scoped_lock lock(sendMutex_);
    sendMsgs_.push_back(message);
}

void BoostTCPConnection::getQueueSize(unsigned long& tcpToSend, unsigned long& tcpRead)
{
    {
        boost::mutex::scoped_lock lock(sendMutex_);
        tcpToSend=sendMsgs_.size();
    }

    {
        boost::mutex::scoped_lock lock(readMutex_);
        tcpRead=receivedMsgs_.size();
    }

}


void BoostTCPConnection::send()
{
    if (sendMsgs_.empty())
        return;

    boost::shared_ptr<std::string> message;
    {
        message=sendMsgs_.front();
        sendMsgs_.pop_front();
    }

    char temp[4];
    size_t messageSize=message->size();
    memcpy(temp,&messageSize, 4);

    message->insert(0,temp, 4);

    boost::asio::async_write(socket_, boost::asio::buffer(*message),
        boost::bind(&BoostTCPConnection::handleWrite, shared_from_this(),
        boost::asio::placeholders::error,
        boost::asio::placeholders::bytes_transferred));

}

void BoostTCPConnection::handleWrite(const boost::system::error_code& error,size_t bytes_transferred)
{
    //Success
    if (error.value() == 0) 
        return;

    std::ostringstream oss;
    oss<< "BoostTCPConnection::handleWrite  got an error Code of "<<error.value()<<" and message "<<error.message()<<" with bytes_transferred = "<<bytes_transferred<<std::endl;
    errorOutput_->outputError(oss.str());
    shutdowner_->shutdown();

}

//***************************************************
//              TCPClient
//***************************************************

TCPClient::TCPClient()
{

}

TCPClient::~TCPClient()
{

}

void TCPClient::threadAction()
{
    io_service_.run();
}

bool TCPClient::start(std::string& hostIP,  unsigned int& hostPort, BoostTCPDebugOutputInterface* output, ShutdownCallback* shutdowner)
{
    bool bResult(false);
    errorOutput_=output;
    connection_ = BoostTCPConnection::create(io_service_, output, shutdowner);
    shutdowner_=shutdowner;

    //Use multiple threads to do my bidding
    for( int x = 0; x < 3; ++x )
    {
        worker_threads_.create_thread( 
            boost::bind( &TCPClient::threadAction, this )
            );
    }

    boost::system::error_code ec;
    try
    {
        boost::asio::ip::tcp::endpoint ep( boost::asio::ip::address_v4::from_string(hostIP), hostPort);
        if (connection_)
        {


            connection_->socket().async_connect(ep,
                                                boost::bind(&TCPClient::handleConnect, 
                                                this, boost::asio::placeholders::error));

                            //Synchronous code that works fine
                            //boost::system::error_code ec;         
            //connection_->socket().connect(ep, ec);
            /*if (!ec)
            {
                worker_threads_.create_thread( 
                    boost::bind( &TCPClient::handleConnect, this, ec ));

                bResult=true;
            }
            else
            {
                std::ostringstream oss;
                oss<< "BoostTCPConnection::start has an error "<<ec.message()<<std::endl;
                errorOutput_->outputError(oss.str());

            }*/

        }
    }
    catch (std::exception& e)
    {
        std::ostringstream oss;
        oss<< "BoostTCPConnection::start received the exception "<<e.what()<<std::endl;
        errorOutput_->outputError(oss.str());
    }

    return bResult;
}

void TCPClient::handleConnect(const boost::system::error_code& error)
{

    if (error )
    {
        std::ostringstream oss;
        oss<< "BoostTCPConnection::handleConnect received the error "<<error.message()<<std::endl;
        errorOutput_->outputError(oss.str());
        shutdowner_->shutdown();
        return;
    }

    if (connection_)
        connection_->start();

}

void TCPClient::stop()
{
    if (connection_)
        connection_->setRunning(false);

    io_service_.stop();
    worker_threads_.join_all();
}

void TCPClient::getQueueSize(unsigned long& tcpToSend, unsigned long& tcpRead)
{
    if (connection_)
        connection_->getQueueSize(tcpToSend, tcpRead);
    else
    {
        tcpToSend=0;
        tcpRead=0;
    }
}

void TCPClient::send(boost::shared_ptr<std::string>& message)
{
    if (connection_)
        connection_->enqueueMessageToSend(message);
}

void TCPClient::getMessages(std::list< boost::shared_ptr<std::string> >& messages)
{ 
    if (connection_)
        connection_->getMessages(messages);
}
Mike Pennington
  • 41,899
  • 19
  • 136
  • 174
Thomas Lann
  • 1,124
  • 5
  • 17
  • 35
  • nit: identifiers beginning with an underscore and capital letter are [reserved in any scope](http://stackoverflow.com/a/228797/283302). You should rename your include guard. – Sam Miller Nov 09 '12 at 01:05
  • 1
    This is really, really awful code. Boost provides such a beautiful way to dispatch threads, why would you write `sleep` loops and create dedicated I/O services?! – David Schwartz Nov 09 '12 at 01:07
  • Ok I renamed the guards. – Thomas Lann Nov 09 '12 at 05:24
  • David and Sam. I'm very open to a better way of doing this. The code is the result of me first getting a synchronous version running and then trying to get an async version of everything running and probably not understanding the examples. One of the problems I had with the examples is they all seemed to just echo something back to the client. The client and the server in the above case would need to send packets back and forth at the same time. The examples seemed to call async_read and then async_write. But in my case I would need to do both. What would be the best way of doing this? – Thomas Lann Nov 09 '12 at 05:28
  • If you need to call both, call both. – David Schwartz Nov 09 '12 at 14:41
  • @DavidSchwartz I'm having a hard time understanding how this would be structured. I started a new question because it seems to deviate from this one a bit. If you don't mind I would love if you took a [look](http://stackoverflow.com/questions/13314192/having-a-hardtime-understanding-a-few-concepts-with-boost-asio-tcp-with-async-re) – Thomas Lann Nov 09 '12 at 19:00

2 Answers2

5

The problem is that your io_service has an empty queue when started and returns immediately:

void TCPClient::threadAction() {
  io_service_.run();
  assert(0); // triggers right away because there's no async operation queued
}

From Stopping the io_service from running out of work:

Some applications may need to prevent an io_service object's run() call from returning when there is no more work to do. For example, the io_service may be being run in a background thread that is launched prior to the application's asynchronous operations. The run() call may be kept running by creating an object of type io_service::work.

Anonymous Coward
  • 6,186
  • 1
  • 23
  • 29
2

You need to run the io_service for async handlers to fire.

Sam Miller
  • 23,808
  • 4
  • 67
  • 87