7

Asio: Is there an automatically resizable buffer for receiving input?

I don't know in advance the size to be received, so I send this quantity in a header.

I've looked at http://www.boost.org/doc/libs/1_59_0/doc/html/boost_asio/example/cpp03/chat/chat_message.hpp for an example using a header, but this example assumes the specification of a maximum body size.

Looking at the asio::buffer class, I must provide some underlying buffer, thus not flexible. Instead I've looked towards the asio::streambuf class, but using it as below gives segmentation/memory errors.

I try to give a maximum size to read only HEADER_LEN bytes i.e. the header.

Is this approach wrong?

void do_recv_header() 
{
    asio::streambuf buf(HEADER_LEN);
    asio::async_read(*g_selected_conn, buf, [this, &buf](const system::error_code& ec, std::size_t bytes_transferred)
    {
        if (ec != 0) {
            std::cout << "async_read() error: " << ec.message() << " (" << ec.value() << ") " << std::endl;
            remove_closed_conn(g_selected_conn);
            SetEvent(g_wait_event);
        }
        else {
            std::istream is(&buf);
            int body_len;
            is >> body_len;
            std::cout << body_len << std::endl;
            do_recv_body(body_len);
        }
    });
}
Tanner Sansbury
  • 51,153
  • 9
  • 112
  • 169
Shuzheng
  • 11,288
  • 20
  • 88
  • 186
  • Do you know at what line it segfaults? –  Mar 30 '16 at 10:17
  • Sorry no, but it has to do with the streambuf. – Shuzheng Mar 30 '16 at 10:24
  • First of all, why are you not using a regular fixed-size buffer if you already know how many octets/bytes you will fill it with? Secondly, I think that using `boost::asio::async_read_until` or `boost::asio::async_read_some` makes more sense with regards to your original problem. –  Mar 30 '16 at 10:37
  • I'm using a fixed size buffer, because I want async_read to stop only when it has read HEADER_LEN bytes. The above code actually reads the header, which is fixed size. – Shuzheng Mar 30 '16 at 12:07

1 Answers1

10

boost::asio::streambuf is an automatically resizable buffer class. This buffer type is often used when initiating a read operation whose completion is predicated on the data's content, and not necessarily the size of the data. For example, one may use boost::asio::read_until() to read until a newline, without knowing or specifying the how much data may be read.

In the case of an application protocol with a fixed size header that contains the length of the body, and the header is followed by a variable length body, consider using a buffer type, such as std::vector<>. This will provide the same level of flexibility as boost::asio::streambuf, while simplifying some of the bookkeeping:

std::vector<char> buffer;

// Read header.
buffer.resize(protocol::header_size);
boost::asio::read(socket, boost::asio::buffer(buffer));

// Extract body size from header, resize buffer, then read
// body.
auto body_size = parse_header(buffer);
buffer.resize(body_size);
boost::asio::read(socket, boost::asio::buffer(buffer));

process_body(buffer);

Not how resizing the vector indicates how much data will be read in the read operations. When using streambuf, one has to manage the input and output sequences directly with these operations:

boost::asio::streambuf streambuf;

// Read header into the streambuf's output sequence.
auto bytes_transferred = boost::asio::read(socket,
  streambuf.prepare(protocol::header_size));
// Commit read data from output sequence into the input
// sequence.
streambuf.commit(bytes_transferred);

// Extract body size from header.  This would likely
// consume all of the streambuf's input sequence.
auto body_size = parse_header(streambuf);
// Clear the input sequence.
streambuf.consume(streambuf.size());

// Ready body into the streambuf's output sequence.
bytes_transferred = boost::asio::read(socket,
  streambuf.prepare(body_size));
// Commit read data from output sequence into the input
// sequence.
streambuf.commit(bytes_transferred);

// Extract all of stream into the body.
process_body(streambuf);

Here is a complete example demonstrating this approach:

#include <array> // std::array
#include <functional> // std::bind
#include <iostream> // std::cout, std::endl
#include <vector> // std::vector
#include <boost/asio.hpp>

// This example is not interested in the handlers, so provide a noop function
// that will be passed to bind to meet the handler concept requirements.
void noop() {}

// The application protocol will consists of a fixed-size header
// containing a std::size_t with the length of the following
// variable length body.  To keep it simple, some details
// are ommitted, such as endian handling.
namespace protocol {
enum
{
  header_size = sizeof(std::size_t)
};
} // namespace protocol

std::vector<char> build_header(const std::string& body)
{
  std::vector<char> buffer(protocol::header_size);
  auto body_size = body.size();
  std::memcpy(&buffer[0], &body_size, sizeof body_size);
  return buffer;
}

std::size_t parse_header(const std::vector<char>& buffer)
{
  return *reinterpret_cast<const std::size_t*>(&buffer[0]);
}

int main()
{
  using boost::asio::ip::tcp;

  // Create all I/O objects.
  boost::asio::io_service io_service;
  tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 0));
  tcp::socket socket1(io_service);
  tcp::socket socket2(io_service);

  // Connect the sockets.
  acceptor.async_accept(socket1, std::bind(&noop));
  socket2.async_connect(acceptor.local_endpoint(), std::bind(&noop));
  io_service.run();
  io_service.reset();

  //  Write a message from socket1 to socket2.
  std::string test_message = "this is a test message";
  {
    auto header = build_header(test_message);

    // Gather header and body into a single buffer.
    std::array<boost::asio::const_buffer, 2> buffers = {{
      boost::asio::buffer(header),
      boost::asio::buffer(test_message)
    }};

    // Write header and body to socket.
    boost::asio::write(socket1, buffers);
  }

  // Read from socket2.
  {
    // Use a vector to allow for re-sizing based on the
    // amount of data needing to be read.  This also reduces
    // on the amount of reallocations if the vector is reused.    
    std::vector<char> buffer;

    // Read header.
    buffer.resize(protocol::header_size);
    boost::asio::read(socket2, boost::asio::buffer(buffer));

    // Extract body size from header, resize buffer, then read
    // body.
    auto body_size = parse_header(buffer);
    buffer.resize(body_size);
    boost::asio::read(socket2, boost::asio::buffer(buffer));

    // Verify body was read.
    assert(std::equal(begin(buffer), end(buffer), 
                      begin(test_message)));
    std::cout << "received: \n"
                 "  header: " << body_size << "\n"
                 "  body: ";
    std::cout.write(&buffer[0], buffer.size());
    std::cout << std::endl;
  }
}

Output:

received: 
  header: 22
  body: this is a test message
Tanner Sansbury
  • 51,153
  • 9
  • 112
  • 169
  • Excellent! Could it be done using streambuf? I'm confused due to the input and output buffer combination of streambuf... Could the parse header function be rewritten using atoi()? You are using that a vector is implemented as an array, right? I was not aware that a vector could be used like this in buffer(). – Shuzheng Mar 30 '16 at 18:12
  • Why do you call reset on io_service? – Shuzheng Mar 30 '16 at 18:13
  • @Nicolas It certainly can be done with a streambuf and atoi ([demo](http://coliru.stacked-crooked.com/a/03f9f61943f979cd)). This [answer](http://stackoverflow.com/a/31992879/1053968) may help with understanding `streambuf`. – Tanner Sansbury Mar 30 '16 at 19:29
  • Thank you a lot. Why is the reset of io_service necessary? – Shuzheng Mar 30 '16 at 20:34
  • @Nicolas In this context, it is not necessary. It would have been necessary had I used asynchronous operations after `run()` had returned. – Tanner Sansbury Mar 30 '16 at 20:54