6

I am using Google Protocol Buffers and Python to decode some large data files--200MB each. I have some code below that shows how to decode a delimited stream and it works just fine. However it uses the read() command which loads the whole file into memory and then iterates over it.

import feed_pb2 as sfeed
import sys
from google.protobuf.internal.encoder import _VarintBytes
from google.protobuf.internal.decoder import _DecodeVarint32

with open('/home/working/data/feed.pb', 'rb') as f:
    buf = f.read() ## PROBLEM-LOADS ENTIRE FILE TO MEMORY.
    n = 0
    while n < len(buf):
        msg_len, new_pos = _DecodeVarint32(buf, n)
        n = new_pos
        msg_buf = buf[n:n+msg_len]
        n += msg_len
        read_row = sfeed.standard_feed()
        read_row.ParseFromString(msg_buf)
        # do something with read_metric
        print(read_row)

Note that this code comes from another SO post, but I don't remember the exact url. I was wondering if there was a readlines() equivalent with protocol buffers that allows me to read in one delimited message at a time and decode it? I basically want a pipeline that is not limited by the RAM I have to load the file.

Seems like there was a pystream-protobuf package that supported some of this functionality, but it has not been updated in a year or two. There is also a post from 7 years ago that asked a similar question. But I was wondering if there was any new information since then.

python example for reading multiple protobuf messages from a stream

Konrad Rudolph
  • 530,221
  • 131
  • 937
  • 1,214
krishnab
  • 9,270
  • 12
  • 66
  • 123
  • I think the SO question may have been this one: https://stackoverflow.com/questions/11484700/python-example-for-reading-multiple-protobuf-messages-from-a-stream – drevicko May 25 '20 at 08:09

2 Answers2

5

If it is ok to load one full message at a time, this is quite simple to implement by modifying the code you posted:

import feed_pb2 as sfeed
import sys
from google.protobuf.internal.encoder import _VarintBytes
from google.protobuf.internal.decoder import _DecodeVarint32

with open('/home/working/data/feed.pb', 'rb') as f:
    buf = f.read(10) # Maximum length of length prefix
    while buf:
        msg_len, new_pos = _DecodeVarint32(buf, 0)
        buf = buf[new_pos:]
        # read rest of the message
        buf += f.read(msg_len - len(buf))
        read_row = sfeed.standard_feed()
        read_row.ParseFromString(buf)
        buf = buf[msg_len:]
        # do something with read_metric
        print(read_row)
        # read length prefix for next message
        buf += f.read(10 - len(buf))

This reads 10 bytes, which is enough to parse the length prefix, and then reads the rest of the message once its length is known.

String mutations are not very efficient in Python (they make a lot of copies of the data), so using bytearray can improve performance if your individual messages are also large.

jpa
  • 10,351
  • 1
  • 28
  • 45
  • Excellent @jpa . This is very helpful. I was just hoping to understand the options in case my data pipeline grew larger than I expected. So this is really useful. I will look into `bytearrays` as you suggest. – krishnab Oct 18 '19 at 21:00
  • this is great, except that it still reads the whole file into memory (unless you eg. quit after finding the one you want). If you know (or can estimate) the max size of a message, you could use [collections.dequeue](http://docs.python.org/library/collections.html#collections.deque). See [this answer](https://stackoverflow.com/a/4151368/420867) for a nice example of it's usage. – drevicko May 25 '20 at 08:26
  • NameError: name 'n' is not defined. I believe there's a typo in the code. How much is n, 10? – Riccardo Mar 30 '21 at 07:32
  • @Riccardo I think 0. – jpa Mar 30 '21 at 08:32
2

https://github.com/cartoonist/pystream-protobuf/ was updated 6 months ago. I haven't tested it much so far, but it seems to work fine without any need for an update. It provides optional gzip and async.

jrouquie
  • 4,315
  • 4
  • 27
  • 43