I'm building a pipeline to process some compressed JSON messages via Google Cloud Dataflow (Python SDK). More precisely, the JSON files are grouped (in groups of 4) and compressed in gzip format before being published on Google Cloud Pub/Sub. Once published, however, I don't understand how to decompress them and then create a PCollection for each JSON message (i.e. 4 PCollections).
If I log the PCollection after the ReadFromPubSub step I obtain something like that:
INFO:root:b'\x1f\xe2\x80\xb9\x08\...\x06\x00\x00'
Which I suppose is the body of the message (a byte object).
The pipeline is the following:
with beam.Pipeline(options=pipeline_options) as pipeline:
events = (
pipeline
| 'Read PubSub Messages' >> beam.io.ReadFromPubSub(subscription=params["input_subscription"])
| 'Logging zipped' >> beam.ParDo(beam_utils.LogFn())
| 'Uncompress data' >> beam.ParDo(UncompressData())
)
where:
class UncompressData(beam.DoFn):
def process(self, element):
decompressed_byte_data = zlib.decompress(element, zlib.MAX_WBITS|32)
yield decompressed_byte_data
but it doesn't work, raising the zlib.error (the same with zlib.MAX_WBITS|16
):
zlib.error: Error -3 while decompressing data: incorrect header check [while running 'Uncompress data']
Has anyone faced a similar problem before? Any suggestion or solution is welcome!