20

I'm working with data from spinn3r, which consists of multiple different protobuf messages serialized into a byte stream:

http://code.google.com/p/spinn3r-client/wiki/Protostream

"A protostream is a stream of protocol buffer messages, encoded on the wire as length prefixed varints according to the Google protocol buffer specification. The stream has three parts: a header, the payload, and a tail marker."

This seems like a pretty standard use case for protobufs. In fact, protobuf core distribution provides CodedInputStream for both C++ and Java. But, it appears that protobuf does not provide such a tool for python -- the 'internal' tools are not setup for this kind of external use:

https://groups.google.com/forum/?fromgroups#!topic/protobuf/xgmUqXVsK-o

So... before I go and cobble together a python varint parser and tools for parsing a stream of different message types: does anyone know of any tools for this?

Why is it missing from protobuf? (Or am I just failing to find it?)

This seems like a big gap for protobuf, especially when compared to thrift's equivalent tools for both 'transport' and 'protocol'. Am I viewing that correctly?

ttst
  • 493
  • 1
  • 5
  • 10

3 Answers3

16

It looks like the code in the other answer is potentially lifted from here. Check the licence before using this file but I managed to get it to read varint32s using code such as this:

import sys
import myprotocol_pb2 as proto
import varint # (this is the varint.py file)

data = open("filename.bin", "rb").read() # read file as string
decoder = varint.decodeVarint32          # get a varint32 decoder
                                         # others are available in varint.py

next_pos, pos = 0, 0
while pos < len(data):
    msg = proto.Msg()                    # your message type
    next_pos, pos = decoder(data, pos)
    msg.ParseFromString(data[pos:pos + next_pos])

    # use parsed message

    pos += next_pos
print "done!"

This is very simple code designed to load messages of a single type delimited by varint32s which describe the next message's size.


Update: It may also be possible to include this file directly from the protobuf library by using:

from google.protobuf.internal.decoder import _DecodeVarint32
Dragos
  • 498
  • 4
  • 13
  • 1
    For newer stuff, I get the decoder via `from google.protobuf.internal.decoder import _DecodeVarint32` – Peter Ehrlich Dec 07 '16 at 01:02
  • 1
    I've had to do this a number of times recently. I wasn't able to figure out the `msg.type == proto.Msg.END` condition, but simply doing `while pos < len(data):` works great for me. – Moodragonx May 06 '17 at 21:35
  • Of course, makes sense, answer updated. Thanks @Moodragonx – Dragos May 09 '17 at 08:36
  • just curious, this looks to be creating a new object of type `Msg` in each loop, what was Google's rationale for this? Is this overhead expensive? – Tommy Dec 11 '17 at 19:52
  • @Tommy To clarify this isn't google code, this is my code as an example of how one might solve this problem in Python. I am not a Python expert but even if you clear the Message object to reuse it, its binary data will still have to be copied from the stream anyway. If you want a no-copy solution perhaps Python isn't the way to go; I primarily use protobufs in C++. – Dragos Dec 13 '17 at 13:41
7

I've implemented a small python package to serialize multiple protobuf messages into a stream and deserialize them from a stream. You can install it by pip:

pip install pystream-protobuf

Here's a sample code writing two lists of protobuf messages in to a file:

import stream

with stream.open("test.gam", "wb") as ostream:
    ostream.write(*objects_list)
    ostream.write(*another_objects_list)

and then reading the same messages (e.g. Alignment messages defined in vg_pb2.py) from the stream:

import stream
import vg_pb2

alns_list = []
with stream.open("test.gam", "rb") as istream:
    for data in istream:
        aln = vg_pb2.Alignment()
        aln.ParseFromString(data)
        alns_list.append(aln)
cartoonist
  • 5,991
  • 6
  • 37
  • 40
-3

This is simple enough that I can see why maybe nobody has bothered to make a reusable tool:

'''
Parses multiple protobuf messages from a stream of spinn3r data
'''

import sys
sys.path.append('python_proto/src')
import spinn3rApi_pb2
import protoStream_pb2

data = open('8mny44bs6tYqfnofg0ELPg.protostream').read()

def _VarintDecoder(mask):
    '''Like _VarintDecoder() but decodes signed values.'''

    local_ord = ord
    def DecodeVarint(buffer, pos):
        result = 0
        shift = 0
        while 1:
            b = local_ord(buffer[pos])
            result |= ((b & 0x7f) << shift)
            pos += 1
            if not (b & 0x80):
                if result > 0x7fffffffffffffff:
                    result -= (1 << 64)
                    result |= ~mask
                else:
                    result &= mask
                return (result, pos)
            shift += 7
            if shift >= 64:
                ## need to create (and also catch) this exception class...
                raise _DecodeError('Too many bytes when decoding varint.')
    return DecodeVarint

## get a 64bit varint decoder
decoder = _VarintDecoder((1<<64) - 1)

## get the three types of protobuf messages we expect to see
header    = protoStream_pb2.ProtoStreamHeader()
delimiter = protoStream_pb2.ProtoStreamDelimiter()
entry     = spinn3rApi_pb2.Entry()

## get the header
pos = 0
next_pos, pos = decoder(data, pos)
header.ParseFromString(data[pos:pos + next_pos])
## should check its contents

while 1:
    pos += next_pos
    next_pos, pos = decoder(data, pos)
    delimiter.ParseFromString(data[pos:pos + next_pos])

    if delimiter.delimiter_type == delimiter.END:
        break

    pos += next_pos
    next_pos, pos = decoder(data, pos)
    entry.ParseFromString(data[pos:pos + next_pos])
    print entry
Jesse Chisholm
  • 3,857
  • 1
  • 35
  • 29
ttst
  • 493
  • 1
  • 5
  • 10