0

I'm building a pipeline to process some compressed JSON messages via Google Cloud Dataflow (Python SDK). More precisely, the JSON files are grouped (in groups of 4) and compressed in gzip format before being published on Google Cloud Pub/Sub. Once published, however, I don't understand how to decompress them and then create a PCollection for each JSON message (i.e. 4 PCollections).
If I log the PCollection after the ReadFromPubSub step I obtain something like that:

INFO:root:b'\x1f\xe2\x80\xb9\x08\...\x06\x00\x00'  

Which I suppose is the body of the message (a byte object).
The pipeline is the following:

with beam.Pipeline(options=pipeline_options) as pipeline:
    events = (
        pipeline
        | 'Read PubSub Messages' >> beam.io.ReadFromPubSub(subscription=params["input_subscription"])
        | 'Logging zipped' >> beam.ParDo(beam_utils.LogFn())            
        | 'Uncompress data' >> beam.ParDo(UncompressData())
    )  

where:

class UncompressData(beam.DoFn):
    def process(self, element):
        decompressed_byte_data = zlib.decompress(element, zlib.MAX_WBITS|32)
        yield decompressed_byte_data

but it doesn't work, raising the zlib.error (the same with zlib.MAX_WBITS|16):

zlib.error: Error -3 while decompressing data: incorrect header check [while running 'Uncompress data']

Has anyone faced a similar problem before? Any suggestion or solution is welcome!

Konrad Rudolph
  • 530,221
  • 131
  • 937
  • 1,214
  • Hey Federico, does https://stackoverflow.com/questions/3122145/zlib-error-error-3-while-decompressing-incorrect-header-check help you out? – Cubez Sep 29 '20 at 18:05
  • I tried all the solutions listed in that post but none of them work correctly. Using the decompress returns "error -3", using the read() returns "embedded null byte" or "Not a gzipped file (b'\x1f\xe2')". – Federico Barusco Sep 29 '20 at 20:05
  • Maybe the issue is on the compression side. Did you compress the data with a compression object like `zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)`? – Cubez Sep 29 '20 at 22:11
  • No, the gzip compression is performed using an external tool. – Federico Barusco Oct 02 '20 at 09:21
  • What exactly external tool was it? Now I try to uncompress very similar data `\x1f\xe2\x80\xb9\x08\x00\x00\x00\x00\x00\...x03\x00\x00'`. Have you solved your issue? – SKulibin May 09 '21 at 23:57
  • I have almost the same issue: https://reverseengineering.stackexchange.com/questions/27634/help-me-please-decode-the-string-looks-like-gzip-but-it-is-not , solving it but without any success – SKulibin May 10 '21 at 09:17

0 Answers0