2

My application is based on the asio chat example and consists of a client and a server: - Client: Connect to the server, receive requests and respond to it - Server: Has a QT GUI (main thread) and a network service (separate thread) listening for connections, sending requests to particular clients and interprets the response from/in the GUI

I want to achieve this in an asynchronous way to avoid a seperate thread for each client connection.

In my QT window, I have one io_service instance and one instance of my network service:

io_service_ = new asio::io_service();
asio::ip::tcp::endpoint endpoint(asio::ip::tcp::v4(), "1234");
service_ = new Service(*io_service_, endpoint, this);

asio::io_service* ioServicePointer = io_service_;
t = std::thread{ [ioServicePointer](){ ioServicePointer->run(); } };

I want to be able to send data to one client, like this:

service_->send_message(selectedClient.id, msg);

And I am receiving and handling the responses via the observer pattern (the window implements the IStreamListener interface)

Service.cpp:

#include "Service.h"
#include "Stream.h"

void Service::runAcceptor()
{
    acceptor_.async_accept(socket_,
        [this](asio::error_code ec)
    {
        if (!ec)
        {
            std::make_shared<Stream>(std::move(socket_), &streams_)->start();

        }

        runAcceptor();
    });
}

void Service::send_message(std::string streamID, chat_message& msg)
{

    io_service_.post(
        [this, msg, streamID]()
    {
        auto stream = streams_.getStreamByID(streamID);
        stream->deliver(msg);
    }); 

}

Stream.cpp:

#include "Stream.h"
#include <iostream>
#include "../chat_message.h"

Stream::Stream(asio::ip::tcp::socket socket, StreamCollection* streams)
    : socket_(std::move(socket))
{
    streams_ = streams;         // keep a reference to the streamCollection

    // retrieve endpoint ip
    asio::ip::tcp::endpoint remote_ep = socket_.remote_endpoint();
    asio::ip::address remote_ad = remote_ep.address();
    this->ip_ = remote_ad.to_string();      
}

void Stream::start()
{
    streams_->join(shared_from_this());
    readHeader();
}

void Stream::deliver(const chat_message& msg)
{
    bool write_in_progress = !write_msgs_.empty();
    write_msgs_.push_back(msg);
    if (!write_in_progress)
    {
        write();
    }
}

std::string Stream::getName()
{
    return name_;
}

std::string Stream::getIP()
{
    return ip_;
}


void Stream::RegisterListener(IStreamListener *l)
{
    m_listeners.insert(l);
}

void Stream::UnregisterListener(IStreamListener *l)
{
    std::set<IStreamListener *>::const_iterator iter = m_listeners.find(l);
    if (iter != m_listeners.end())
    {
        m_listeners.erase(iter);
    }
    else {
        std::cerr << "Could not unregister the specified listener object as it is not registered." << std::endl;
    }
}

void Stream::readHeader()
{
    auto self(shared_from_this());
    asio::async_read(socket_,
        asio::buffer(read_msg_.data(), chat_message::header_length),
        [this, self](asio::error_code ec, std::size_t /*length*/)
    {
        if (!ec && read_msg_.decode_header())
        {
            readBody();
        }
        else if (ec == asio::error::eof || ec == asio::error::connection_reset)
        {
            std::for_each(m_listeners.begin(), m_listeners.end(), [&](IStreamListener *l) {l->onStreamDisconnecting(this->id()); });
            streams_->die(shared_from_this());
        }
        else
        {
            std::cerr << "Exception: " << ec.message();
        }
    });
}

void Stream::readBody()
{
    auto self(shared_from_this());
    asio::async_read(socket_,
        asio::buffer(read_msg_.body(), read_msg_.body_length()),
        [this, self](asio::error_code ec, std::size_t /*length*/)
    {
        if (!ec)
        {
                    // notify the listener (GUI) that a response has arrived and pass a reference to it

            auto msg = std::make_shared<chat_message>(std::move(read_msg_));

            std::for_each(m_listeners.begin(), m_listeners.end(), [&](IStreamListener *l) {l->onMessageReceived(msg); });

            readHeader();
        }
        else
        {
            streams_->die(shared_from_this());
        }
    });
}

void Stream::write()
{
    auto self(shared_from_this());
    asio::async_write(socket_,
        asio::buffer(write_msgs_.front().data(),
        write_msgs_.front().length()),
        [this, self](asio::error_code ec, std::size_t /*length*/)
    {
        if (!ec)
        {
            write_msgs_.pop_front();
            if (!write_msgs_.empty())
            {
                write();
            }
        }
        else
        {
            streams_->die(shared_from_this());
        }
    });
}

Interfaces

 class IStream
{
public: 
    /// Unique stream identifier
    typedef void* TId;
    virtual TId id() const
    {
        return (TId)(this);
    }

    virtual ~IStream() {}
    virtual void deliver(const chat_message& msg) = 0;

    virtual std::string getName() = 0;
    virtual std::string getIP() = 0;

    /// observer pattern    
    virtual void RegisterListener(IStreamListener *l) = 0;
    virtual void UnregisterListener(IStreamListener *l) = 0;
};

 class IStreamListener
{
public:
    virtual void onStreamDisconnecting(IStream::TId streamId) = 0;
    virtual void onMessageReceived(std::shared_ptr<chat_message> msg) = 0;

};

/*
    streamCollection / service delegates
*/
class IStreamCollectionListener
{
public:
    virtual void onStreamDied(IStream::TId streamId) = 0;
    virtual void onStreamCreated(std::shared_ptr<IStream> stream) = 0;

};

StreamCollection is basically a set of IStreams:

 class StreamCollection
{
public:
    void join(stream_ptr stream)
    {
        streams_.insert(stream);
        std::for_each(m_listeners.begin(), m_listeners.end(), [&](IStreamCollectionListener *l) {l->onStreamCreated(stream); });


    }
    // more events and observer pattern inplementation

First of all: The code works as intended so far.

My question: Is this the way ASIO is supposed to be used for asynchronous programming? I'm especially unsure about the Service::send_message method and the use of io_service.post. What is it's purpose in my case? It did work too when I just called async_write, without wrapping it in the io_service.post call.

Am I running into problems with this approach?

Tanner Sansbury
  • 51,153
  • 9
  • 112
  • 169
user66875
  • 2,772
  • 3
  • 29
  • 55

1 Answers1

2

Asio is designed to be a tookit rather than a framework. As such, there are various ways to successfully use it. Separating the GUI and network threads, and using asynchronous I/O for scalability can be a good idea.

Delegating work to the io_service within a public API, such as Service::send_message(), has the following consequences:

  • decouples the caller's thread from the thread(s) servicing the io_service. For example, if Stream::write() performs a time consuming cryptographic function, the caller thread (GUI) would not be impacted.
  • it provides thread-safety. The io_service is thread-safe; however socket is not thread-safe. Additionally, other objects may not be thread safe, such as write_msgs_. Asio guarantees that handlers will only be invoked from within threads running the io_servce. Consequently, if only one thread is running the io_service, then there is no possibility for concurrency and both socket_ and write_msgs_ will be accessed in a thread-safe manner. Asio refers to this as an implicit strand. If more than one thread is processing the io_service, then one may need to use an explicit strand to provide thread safety. See this answer for more details on strands.

Additional Asio considerations:

  • Observers are invoked within handlers, and handlers are running within the network thread. If any observer takes a long time to complete, such as having to synchronize with various shared objects touched by the GUI thread, then it could create poor responsiveness across other operations. Consider using a queue to broker events between the observer and subject components. For instance, one could use another io_service as a queue, that is being ran by its own thread, and post into it:

    auto msg = std::make_shared<chat_message>(std::move(read_msg_));
    for (auto l: m_listeners)
        dispatch_io_service.post([=](){ l->onMessageReceived(msg); });
    
  • Verify that the container type for write_msgs_ does not invalidate iterators, pointers and references to existing elements on push_back() and other elements for pop_front(). For instance, using std::list or std::dequeue would be safe, but a std::vector may invalidate references to existing elements on push_back.

  • StreamCollection::die() may be called multiple times for a single Stream. This function should either be idempotent or handle the side effects appropriately.
  • On failure for a given Stream, its listeners are informed of a disconnect only in one path: failing to read a header with an error of asio::error::eof or asio::error::connection_reset. Other paths do not invoke IStreamListener.onStreamDisconnecting():
    • the header is read, but decoding failed. In this particular case, the entire read chain will stop without informing other components. The only indication that a problem has occurred is a print statement to std::cerr.
    • when there is a failure reading the body.
Community
  • 1
  • 1
Tanner Sansbury
  • 51,153
  • 9
  • 112
  • 169
  • Code-review feedback: consider using ranged-based for loops; be consistent with styling and naming: `streams_` vs `m_listeners`; the `Service` and `Stream` family types could benefit from more meaningful names (e.g. is `IStream` and input stream or a stream interface)? – Tanner Sansbury Mar 07 '16 at 00:22
  • Thank you very much for your feedback! – user66875 Mar 07 '16 at 05:54