I have a boost asio project that I am at wits end with. I have defined a TCPConnect class which is inherited by both a TCPSession and a TCPClient class. The reason for this is because I would like both the server and client side of the connection to be directly used for sending and receiving messages over full duplex TCP/IP. Only the TCPSession class listens for new connections, and only the TCPClient class makes an outgoing connection.
When the client connects to the server it sends a handshake message and blocks for the response. The server receives the message and sends a handshake ack back. On sending the ack, the server considers the connection completed. On receiving the ack, the client considers the connection completed.
The problem that I am having is that only the client side (the TCPClient object) can call its inherited TCPConnect::Send() and have it be received by the remote TCPSession object. If the server side (the TCPSession object) calls TCPConnect::Send(), it puts the message on the line without a problem, but the message is never received by the remote TCPClient object.
I must say I am a total beginner at working with boost. I have looked into this issue, trying to word and reword my search query, but have not found a solution. On both sides I have the TCPSession and TCPClient objects sitting on an async_read(), but on the client side the callback for the async_read() does not get called when the server side sends a message. I am assuming this has to do with how the io_service is set up on the TCPSession object's side, and possibly with threading.
So, a TCPServer is started inside a TCPWorker::run() function, this function being run in its own thread. The TCPWorker class owns the io_service object. The threaded run() function instantiates a TCPServer object and then calls io_service::run(). The TCPServicer object is responsible for creating TCPSession objects for new connections. For each new connection, the TCPServer creates a new TCPSession object with the io_service pointer and calls async_accept on its acceptor. Once a new connection is accepted, the TCPSession's socket is set for an async_read(). So it is known that io_service and the multiple sockets that can be created to use it are in one thread.
On the TCPClient side, when Connect() is called, if an io_service doesn't exist and/or a thread does not yet exist for the io_service, they are created using the following lines:
if (!m_pClientServiceThread) { if (!m_pclient_io_service) // if no io_service has yet been created m_pclient_io_service = new boost::asio::io_service; m_pClientServiceThread = new boost::thread(boost::bind(&boost::asio::io_service::run, m_pclient_io_service)); }
So the service is being run in a thread. Then a new resolver and a new boost::asio::ip::tcp::socket are created given the io_service pointer, and boost::asio::connect() is called given the socket and a valid resolved endpoint. So the client seems to have its io_service running in a thread. Having successfully made the connection, I then send a handshake message using boost::asio::read(), and sit with boost::asio::read() waiting on the handshare response. One receiving a valid response, I pass the socket to an async_read() to wait for incoming messages.
I have looked at this for so long now without figuring out why the client side async_read() does no receive a message sent from the server side, even though the server side receives the message that is sent from the client side.
Please help me to figure this out. I am quite sure there is something simple I am not seeing, but as I have said, I am not a boost expert, so I am not sure what it is.
Added code:
TcpConnect class:
class TcpConnect : public boost::enable_shared_from_this<TcpConnect>
{
TcpConnect(boost::asio::io_service* pio_service, ConnType connType, std::string sHostIp, int iHostPort)
: m_pio_service(pio_service)
, eConnType(connType)
, m_strHostIp(sHostIp)
, m_iHostPort(iHostPort)
{
}
virtual ~TcpConnect() { /* does what is needed - this works */ }
bool SendBlocked(CmdPacket& msg, CmdPacket& rsp)
{
// Function used for handling connection handshake response
std::size_t write_length = boost::asio::write( *m_pSocket, boost::asio::buffer(msg.Serialize(), (std::size_t)msg.SerializedLength()));
if (msg.SerializedLength() != write_length)
{
return false;
}
boost::asio::streambuf sbuff;
boost::system::error_code error;
size_t reply_length(0);
// read header for message body length
byte* buffer = rsp.CreateBuffer(MSG_HEADER_LENGTH);
reply_length = boost::asio::read(*m_pSocket, boost::asio::buffer(buffer, MSG_HEADER_LENGTH), boost::asio::transfer_exactly(MSG_HEADER_LENGTH), error);
if (error || !rsp.ReadMessageLength())
{
/* error handled here */
return false;
}
// read message body
int expectedlen = rsp.BodyLength();
buffer = rsp.CreateBuffer(expectedlen);
reply_length = boost::asio::read(*m_pSocket, boost::asio::buffer(buffer, expectedlen), boost::asio::transfer_exactly(expectedlen), error);
if (error)
{
/* error handled here */
return false;
}
if (!rsp.Deserialize() || reply_length != rsp.BodyLength())
{
/* error handled here */
return false;
}
return true;
}
bool Send(CmdPacket& msg)
{
bool bStatus = true;
size_t write_length = 0;
try
{
write_length = boost::asio::write( *m_pSocket, boost::asio::buffer(msg.Serialize(), (std::size_t)msg.SerializedLength()) );
}
catch (...)
{
/* error handled here */
return false;
}
if (write_length != msg.SerializedLength())
{
/* error handled here */
return false;
}
return true;
}
void StartAsyncRead()
{
m_pInMsg = new CmdPacket();
boost::asio::async_read(*m_pSocket, boost::asio::buffer(m_pInMsg->CreateBuffer(MSG_HEADER_LENGTH), MSG_HEADER_LENGTH),
boost::bind(&TcpConnect::handle_read_header, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void handle_read_header(const boost::system::error_code& error, size_t bytes_transferred)
{
if (!error && bytes_transferred == MSG_HEADER_LENGTH && m_pInMsg->ReadMessageLength())
{
boost::asio::async_read(*m_pSocket,
boost::asio::buffer(m_pInMsg->CreateBuffer(m_pInMsg->SerializedLength()), m_pInMsg->SerializedLength()),
boost::bind(&TcpConnect::handle_read_body, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
else if (error)
{
/* errors handled */
}
}
void handle_read_body(const boost::system::error_code& error, size_t bytes_transferred)
{
bool deserialized = false;
if (!error && (deserialized = m_pInMsg->Deserialize()))
{
// if not yet connected, expecting handshake message, in which case acknowledge, otherwise error
if (m_pInMsg->IsHandshake())
{
m_pInMsg->SetAcknowledge(true);
std::size_t write_length = boost::asio::write(
*m_pSocket, boost::asio::buffer(m_pInMsg->Serialize(), (std::size_t)m_pInMsg->SerializedLength()));
if (write_length == m_pInMsg->SerializedLength())
{
/* we sent the acknowledgement, so we consider we're connected */
}
else
{
/* handling error here */
return;
}
delete m_pInMsg;
m_pInMsg = NULL;
}
// if graceful disconnect, notify the connection manager of new status, which will remove the connection from the map
else if (m_pInMsg->IsDisconnect())
{
/* disconnect request handled here */
return;
}
else
{
/* message received, passing it to the local process here */
}
// set up to handle the next read
m_pInMsg = new CmdPacket;
boost::asio::async_read(*m_pSocket, boost::asio::buffer(m_pInMsg->CreateBuffer(MSG_HEADER_LENGTH), MSG_HEADER_LENGTH),
boost::bind(&TcpConnect::handle_read_header, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
else if (error)
{
/* handle case where boost error */
}
else if (!deserialized)
{
/* handle case where message not correctly deserialized */
}
}
protected:
ConnType eConnType;
boost::asio::ip::tcp::socket* m_pSocket{ 0 };
boost::asio::io_service* m_pio_service;
boost::asio::ip::tcp::resolver::iterator m_itEndpoint;
CmdPacket* m_pInMsg{ 0 };
std::string m_strHostIp;
int m_iHostPort;
};
typedef boost::shared_ptr<TcpConnect> ConnectionPtr;
TcpClient class:
class TcpClient : public TcpConnect
{
public:
TcpClient(boost::asio::io_service& io_service, ConnType connType, const std::string& sIP, int iPort)
: TcpConnect(&io_service, connType, sIP, iPort)
{
}
~TcpClient() { /* does what is needed - this works */ }
//virtual ObjType Type() { return OT_CLIENT; }
//virtual int sessionId() { return -1; } // client end does not have a session id
//Use the following to initialize and to to reestablish the connection.
bool Connect()
{
bool bStatus = true;
//Convert the port to a string
std::stringstream ss;
ss << m_iHostPort;
std::string strPort = ss.str();
//Establish the connection
try
{
boost::system::error_code ec;
// create TCP resolver and query and resolve the endpoint
boost::asio::ip::tcp::resolver resolver(*m_pio_service);
boost::asio::ip::tcp::resolver::query query(m_strHostIp.c_str(), strPort.c_str());
boost::asio::ip::tcp::resolver::iterator m_iterEndpoint = resolver.resolve(query, ec);
if (ec)
{
/* error handled here */
bStatus = false;
}
else
{
// close an old socket (shouldn't ever be the case)
if (m_pSocket != NULL) CloseSocket(); /* NOTE: this is defined in TcpConnect, but not shown here */
// create the socket on the io_service object and connect to the endpoint
m_pSocket = new boost::asio::ip::tcp::socket(*m_pio_service);
boost::asio::connect(*m_pSocket, m_iterEndpoint, ec);
if (ec)
{
/* error handled here */
bStatus = false;
}
}
} //end try
catch(...)
{
/* error handled here */
bStatus = false;
}
return bStatus;
}
};
typedef boost::shared_ptr<TcpClient> TcpClientPtr;
TcpServer class (run by TcpWorker and creates TcpSession objects):
class TcpServer;
class TcpSession : public TcpConnect
{
public:
TcpSession(boost::asio::io_service& io_service)
: TcpConnect(&io_service)
, m_session_id(next_session_id())
, m_pSocket(new tcp::socket(io_service))
{
}
virtual ~TcpSession() { /* NOTE: m_pSocket is owned and deleted by TcpConnect */ }
private:
int next_session_id()
{
static int id = 0;
return (++id > 0) ? id : 1;
}
private:
int m_session_id;
};
typedef boost::shared_ptr<TcpSession> TcpSessionPtr;
class TcpServer
{
public:
TcpServer(boost::asio::io_service& io_service, short port)
: m_pio_service(&io_service)
, m_acceptor(io_service, tcp::endpoint(tcp::v4(), port))
{
m_acceptor.listen();
}
~TcpServer()
{
boost::system::error_code errorcode;
m_acceptor.close(errorcode);
}
void start_accept()
{
TcpSessionPtr new_session(new TcpSession(*m_pio_service));
// start listening for this session
m_acceptor.async_accept(new_session->socket(),
boost::bind(&TcpServer::handle_accept, this, new_session,
boost::asio::placeholders::error));
new_session.reset();
}
private:
void handle_accept(TcpSessionPtr new_session, const boost::system::error_code& error)
{
if (!error)
{
new_session->StartAsyncRead(); /* NOTE: there is code for aggregating session objects */
/* NOTE: The result of an async_read() will be handled in TcpConnect::handle_read_header() */
}
else
{
/* error handled here */
}
// listen for the next connection
start_accept();
}
private:
boost::asio::io_service* m_pio_service;
tcp::acceptor m_acceptor;
};
class TcpWorker
{
public:
TcpWorker(unsigned int port)
: m_port(port)
{}
~TcpWorker() {}
void StopWorker()
{
if (!m_io_service.stopped())
{
m_io_service.stop();
while (!m_io_service.stopped()) { boost::this_thread::sleep(boost::posix_time::milliseconds(1)); }
}
}
void operator()() // threaded run function started from Communicator::Listen()
{
TcpServer server(m_io_service, (short)m_port);
server.start_accept(); // set up async_accept() for listening
std::size_t inumhandlers = m_io_service.run(); // blocks here until StopWorker() is called
}
private:
unsigned int m_port;
boost::asio::io_service m_io_service;
bool m_running;
};
Communicator class:
class Communicator {
public:
Communicator() = default;
~Communicator() { /* does what is needed - this works */ }
bool Listen()
{
if (!m_pServerThread || !m_pServerWorker)
{
m_pServerWorker = new TcpWorker(m_myPort);
m_pServerThread = new boost::thread(&TcpWorker::operator(), m_pServerWorker);
return true;
}
return false;
}
bool Connect(int srcId, int destId, std::string ipaddr, int port)
{
bool ret = false;
if (connected(destId))
{
ret = true;
}
else
{
// if io_service is not running, start it (happens if never started, or if no remaining client sockets running)
if (!ClientThreadRunning())
{
if (m_pClientThread) // since going to create a new thread, make sure this one is deleted if exists
delete m_pClientThread;
if (!m_pclient_io_service) // if no io_service has yet been created
m_pclient_io_service = new boost::asio::io_service;
m_pClientServiceThread = new boost::thread(boost::bind(&boost::asio::io_service::run, m_pclient_io_service));
}
// create the connection. Wait for Ack before returning.
TcpClientPtr client(new TcpClient(*m_pclient_io_service, destId, ip, port));
// connect to the client and do a handshake
if (client->Connect())
{
// if an initial handshake works, we're connected
CmdPacket msg(CMD_NONE, srcId, destId, port, ipaddr), rsp;
msg.SetHandshake(); // this starts the handshake protocol, which is completed on receiving the necessary response.
if (!client->SendBlocked(msg, rsp) || rsp != msg)
{
client.reset();
ret = false;
}
else
{
// Connected, now set up for asynchronous reading
client->StartAsyncRead(m_pclient_io_service);
// save it in the class
connection = client;
ret = true;
}
}
// decrement reference count, if not added to shared pointer map, this will set client object for deletion
client.reset();
}
return ret;
}
bool sendMessage(CmdPacket& msg)
{
bool bret = false;
if (connection != nullptr)
{
iter->second->Send(msg);
}
return bret;
}
private:
TcpConnect *connection{ 0 };
TcpWorker* m_pServerWorker{ 0 };
boost::thread *m_pServerThread{ 0 };
boost::thread* m_pClientThread{ 0 };
boost::asio::io_service* m_pclient_io_service{ 0 };
ConnectionPtr connection{ 0 };
};