7

Sorry to ask this again, but could we please once and for all post some C++ functions that match the Java ones here? They don't seem to be getting added by google, and it is a quite a pain to write them yourself. Below answer using some of this and this.

Community
  • 1
  • 1
Cookie
  • 12,004
  • 13
  • 54
  • 83
  • Which line results in the posted error? – Simon Nov 25 '11 at 13:15
  • Good call, changing it to `boost::asio::write(socket,request)` compiles... – Cookie Nov 25 '11 at 13:21
  • Anyone know how to read into the CodedInputStream? So far I had been using `boost::asio::async_read( socket, boost::asio::buffer(buf), boost::asio::transfer_at_least(1), boost::bind(&Session::Handle_Read,shared_from_this(),boost::asio::placeholders::error));` but clearly that needs to change – Cookie Nov 25 '11 at 13:42
  • Read the prefixed length, then read the rest. – Simon Nov 25 '11 at 13:56
  • So I am actually quite stuck on reading... the problem is that I currently use above call to read into a buffer. Theoretically, I could now do `google::protobuf::io::ArrayInputStream arrayInputStream(buf,5000); google::protobuf::io::CodedInputStream codedInputStream(&arrayInputStream);`, giving me the CodedInputStream, and then reading both parts from there. But the whole point of this exercise is to prepend the size so we know how much to read out of the socket, in case messages got split. Anyone done this already? – Cookie Nov 25 '11 at 14:21
  • Basically, you want to keep async read so a server can handle multiple clients. But then you only want to read the size out of the buffer, check whether the whole message is in the remainder of the buffer, if not, read what is there and do a sync read call to the socket for the rest... I think... I am going to leave this for another day, if someone else wants to have a go at it – Cookie Nov 25 '11 at 14:22

2 Answers2

4

Here are the two basic versions with boost asio. Notice that to make this work properly, technically, in the second version, one would need to look at how much data is in the buffer, work out how big the header was (VarInt is not a fixed size), but CodedInputStream has GetDirectBufferPointer, with a pointer to where it is, so from this pointer one could work out the remaining message size, compare it with the given message size, construct a new adjusted buffer for the remaining size, and do a asio synchronous read for the rest of the message. Below works as long as messages remain small (I guess around 1 kb or so). If someone has the missing bit, please speak up. Thanks.

writeDelimitedTo in C++:

boost::asio::streambuf request;
{
    std::ostream request_stream(&request);

    google::protobuf::io::OstreamOutputStream raw_output (&request_stream);
    google::protobuf::io::CodedOutputStream coded_output(&raw_output);

    coded_output.WriteVarint32(myProtoMsg.ByteSize());
    myProtoMsg.SerializeToCodedStream(&coded_output);
}
boost::asio::write(socket,request);

parseDelimitedFrom:

char buf[5000];
void Session::Read()
{
    boost::asio::async_read( 
        socket,
        boost::asio::buffer(buf),
        boost::asio::transfer_at_least(1),
        boost::bind(&Session::Handle_Read,shared_from_this(),boost::asio::placeholders::error));
}
void Session::Handle_Read(const boost::system::error_code& error)
{
    if (!error)
    {
        google::protobuf::io::ArrayInputStream arrayInputStream(buf,5000);
        google::protobuf::io::CodedInputStream codedInputStream(&arrayInputStream);
        uint32_t messageSize;
        codedInputStream.ReadVarint32(&messageSize);
        //Read more here
        MyProtoMsg myProtoMsg;
        myProtoMsg.ParseFromCodedStream(&codedInputStream);
    }
    Read();
}

EDIT: Above is a bit lazy (with "read more here"). Below is a complete parseDelimitedFrom. Any comments welcome.

NEW parseDelimitedFrom:

static void ReadMyVarint32(int& headerSize,int& messageSize,char buffer[])
{
    // Fast path:  We have enough bytes left in the buffer to guarantee that
    // this read won't cross the end, so we can skip the checks.
    char const* ptr = buffer;
    char b;
    uint32_t result;

    b = *(ptr++); result  = (b & 0x7F)      ; if (!(b & 0x80)) goto done;
    b = *(ptr++); result |= (b & 0x7F) <<  7; if (!(b & 0x80)) goto done;
    b = *(ptr++); result |= (b & 0x7F) << 14; if (!(b & 0x80)) goto done;
    b = *(ptr++); result |= (b & 0x7F) << 21; if (!(b & 0x80)) goto done;
    b = *(ptr++); result |=  b         << 28; if (!(b & 0x80)) goto done;

    // If the input is larger than 32 bits, we still need to read it all
    // and discard the high-order bits.
    for (int i = 0; i < 5; i++) {
    b = *(ptr++); if (!(b & 0x80)) goto done;
    }

    // We have overrun the maximum size of a varint (10 bytes).  Assume
    // the data is corrupt.
    headerSize = 0;
    messageSize = 0;

done:
    headerSize = ptr - buffer;
    messageSize = (int)result;
}

char buf[5000];
int receivedSize(0);
int missingSize(0);

void Session::Read()
{
    boost::asio::async_read( 
        socket,
        boost::asio::buffer(buf),
        boost::asio::transfer_at_least(1),
        boost::bind(&Session::Handle_Read,shared_from_this(),_1,_2));
}

void Session::Handle_Read(const boost::system::error_code& error,std::size_t bytes_transferred)
{
    if (!error)
    {
        int mybytes_transferred((int)bytes_transferred);
        if(missingSize == 0)
        {
            int headerSize, messageSize;
            ReadMyVarint32(headerSize,messageSize,buf);
            //std::cout << "Read new message: HeaderSize " << headerSize << " MessageSize " << messageSize << " Received: " << mybytes_transferred << std::endl;

            for(int i(0);i<mybytes_transferred-headerSize;++i)
                request[i] = buf[headerSize+i];

            missingSize = headerSize + messageSize - mybytes_transferred;
            receivedSize = mybytes_transferred - headerSize;
        }
        else
        {
            //std::cout << "Continue message: Read so far " << receivedSize << " Missing " << missingSize << " Received: " << mybytes_transferred << std::endl;
            for(int i(0);i<mybytes_transferred;++i)
                request[receivedSize+i] = buf[i];
            missingSize -= mybytes_transferred;
            receivedSize += mybytes_transferred;
        }
        if(missingSize < 0)
        {
            //Received too much, give up
            missingSize = 0;
            receivedSize = 0;
        }
        else if(missingSize == 0)
        {
            // Use your proto class here
            RequestWrapperPtr requestWrapperPtr(new RequestWrapper());
            if(requestWrapperPtr->ParseFromArray(request,receivedSize))
            {
                HandleRW(requestWrapperPtr);
            }
            else
            {
                // std::cout  << BaseString() << "Session Handle_Read: Failed to parse!";
            }
        }
        Read();
    }
}
Cookie
  • 12,004
  • 13
  • 54
  • 83
  • your "writeDelimitedTo" is wrong. In my experience you have to destruct the coded and io stream before you serialise your object to the stream because coded output stream and io stream only write their data upon destruction. Your example would end up writing the object followed by the size. – ScaryAardvark Oct 15 '12 at 13:07
  • Ooops.. My mistake. your code is correct as you're writing to the CodedOutputStream. The code I had was using the CodedOutputStream to write the varint and then serialising to a OStreamOutputStream. – ScaryAardvark Oct 15 '12 at 13:19
  • Why dont we use the messageSize after it's assignement? – Vitaly Isaev Nov 07 '13 at 08:51
  • It seems to me there should be a PushLimit(int byte_limit) after reading messageSize?? am I wrong on this? – Ben Dec 09 '13 at 21:28
  • @VitalyIsaev,@Ben: Here you go... a version with the message size used – Cookie Dec 12 '13 at 16:47
3

I know this question has been answered, but the question is super old now, and updates have been made to the protobuf code repository since then.

Protobuf now has these functions under the header file here:

#include <google/protobuf/util/delimited_message_util.h>

You can check out the associated cpp file on the git repo here. I have tried out the functions with a boost socket and they work as expected.

robotsfoundme
  • 418
  • 4
  • 18