4

Consider the following code:

void TCP::sendMessage(const std::string& msg) {
  std::ostream os{&m_send_streambuf};
  os << msg;

  if (not m_pending_send) {
    doSend();
  }
}

void TCP::doSend() {
  m_pending_send = true;
  boost::asio::async_write( m_socket, m_send_streambuf
                          , [this](boost::system::error_code ec, std::size_t len)
                            {
                              if (ec) {
                                  throw std::runtime_error(ec.message());
                              }

                              m_send_streambuf.consume(len);

                              if (m_send_streambuf.size() >= 1) {
                                // There's still some data to send
                                doSend();
                              }
                              else {
                                m_pending_send = false;
                              }
                          });
}

m_send_streambuf is a boost::asio::streambuf and m_pending_send indicates wether an asynchronous write is pending.

I'm not sure if it's a good idea to use the streambuf like this as a call to sendMessage will modify the buffer, while an asynchronous write is possibly running.

So, is it safe to use the streambuf like this? Or maybe I should use some kind of double buffering?

Lightness Races in Orbit
  • 378,754
  • 76
  • 643
  • 1,055
Alexandre Hamez
  • 7,725
  • 2
  • 28
  • 39

2 Answers2

4

The documentation of async_write regarding the buffers parameter states:

One or more buffers containing the data to be written. Although the buffers object may be copied as necessary, ownership of the underlying memory blocks is retained by the caller, which must guarantee that they remain valid until the handler is called.

So you should not do any other write operations to the stream buffer, since that may render the underlying memory blocks passed to async_write invalid.

What you could do is simply use a stringstream to buffer the incoming data and once a pending asynchronous write has finished flush the contents to the boost stream buffer and dispatch a new asynchronous write. Of course you can use more complex mechanisms like maintaining a pool of stream buffer that keep their state information (pending or available) and fetch a new stream buffer from the pool on each call of sendMessage and dispatch a new asynchronous write. It all depends on what are your requirements - throughput and latency or memory consumption.

Community
  • 1
  • 1
Rudolfs Bundulis
  • 11,636
  • 6
  • 33
  • 71
  • The documentation warns about overlapping asynchronous writes; `m_pending_send` is there to avoid this. – Alexandre Hamez Oct 12 '15 at 11:51
  • "Of course you can use more complex mechanisms" - google "scatter gather asio" for the usual pattern – sehe Oct 12 '15 at 11:54
  • @AlexandreHamez But with `os << msg` you actually write to the stream, so you already breaking this rule. – Rudolfs Bundulis Oct 12 '15 at 13:08
  • @AlexandreHamez to be more exact - I usually build everything directly onto IOCP or epoll and I haven't explicitly looked if boost performs any other copies from the `streambuf` to other temporary buffers, but if it does not, consider a simple scenario - your dispatched `async_write` has got the underlying buffer pointer of the `streambuf` and is pending and the write operation you perform in `sendMessage` makes the stream reallocate the underlying memory storage while thus invalidating the pointer obtained by the asynchronous write. – Rudolfs Bundulis Oct 12 '15 at 13:17
  • @RudolfsBundulis Indeed, this scenario is sufficient to show that a problem might happen, and based on this, I'll change my code. However, I still think that what is stated in the documentation is no related. The stream which is mentioned in the documentation is the socket, no the streambuf. – Alexandre Hamez Oct 12 '15 at 15:26
  • @AlexandreHamez actually you are right, since I knew what to answer I blindly found the first sentence in the documentation that matched my thoughts and took it as a valid argument. The citation I gave initially as you already mentioned regarded the stream, however the documentation makes a similar statement about the buffers that are passed as argument, I edited the answer accordingly:) – Rudolfs Bundulis Oct 12 '15 at 15:36
  • The [`async_write()`](http://www.boost.org/doc/libs/1_59_0/doc/html/boost_asio/reference/async_write/overload3.html) overload for `streambuf` requires that the `streambuf` remain valid until the write handler is invoked; its intermediate [`async_write_some()`](http://www.boost.org/doc/libs/1_59_0/doc/html/boost_asio/reference/basic_stream_socket/async_write_some.html) operations require that the underlying buffer remain valid. If there are no outstanding intermediate operations, one can safely modify the `streambuf` (e.g. the composed operation `consume()`s after each intermediate operation). – Tanner Sansbury Oct 13 '15 at 15:34
3

This is not safe.

The async_write() operation is composed of zero or more calls to stream.async_write_some() which requires that the provided buffer's underlying memory block remain valid until the completion handler is invoked:

[...] ownership of the underlying memory blocks is retained by the caller, which must guarantee that they remain valid until the handler is called.

The async_write() operation will obtain a buffer representing the basic_streambuf's input sequence via basic_streambuf::data(). Writing to the basic_streambuf, such as through std::ostream, can invalidate the buffer, as noted in the documentation:

The returned object is invalidated by any basic_streambuf member function that modifies the input sequence or output sequence.

One elegant solution to buffering data to be written whilst write operations are pending is to use a queue, as is demonstrated in Sam's answer here.


When providing the basic_streambuf directly to operations, the operations themselves will handle invoking consume() or commit(). One only needs to explicitly invoke consume() or commit() if a basic_streambuf's buffer is provided directly to the operations.

As such, in the posted code, a double consume() is occurring, which could result in some data written to m_send_streambuf's input sequence from the removed before it has been written to the socket:

// The streambuf is provided directly to the operation, so the write
// operation will `consume()` the written data.
boost::asio::async_write(..., m_send_streambuf,
  [](boost::system::error_code ec, std::size_t len)
  {
    // Boost.Asio has already performed m_send_streambuf.consume(len) on
    // behalf of the user, as the streambuf was directly provided.
    ...

    // Data that is in the input sequence but has not yet been written
    // to the socket will be consumed.
    m_send_streambuf.consume(len);
  });

Either remove the explicit consume() or pass the input sequence (data()) to the write operation instead of the streambuf. For more details on streambuf, consider reading this answer.

Community
  • 1
  • 1
Tanner Sansbury
  • 51,153
  • 9
  • 112
  • 169
  • 1
    In fact, I started to use a queue, but I felt it clumsy to send elements of the queue one by one as I didn't find how to perform a gather-write with a `std::deque`. This is why I tried to use a `streambuf`. And thanks for the double `consume`, I wasn't aware of this. – Alexandre Hamez Oct 12 '15 at 20:00