I have a pipeline as follow:
import base64
import gzip
import logging
import apache_beam as beam
import data.build.common.v1.common_pb2 as common_pb2
from data.pipeline.steps.console_proto_list import CONSOLE_PROTO_LIST
from google.protobuf.message import DecodeError
class GetClearMessage(beam.DoFn):
def process(self, element, **kwargs):
""" Parse encoded proto
Returns an instance of EntryPoint decoded.
"""
logging.info('Unserializing data')
logging.info(element)
batch_entry_point = common_pb2.BatchEntryPoint()
data = element.data
logging.info(data)
try:
batch_entry_point.ParseFromString(data)
except DecodeError:
unziped = gzip.decompress(data)
batch_entry_point.ParseFromString(unziped)
logging.info(batch_entry_point)
return [batch_entry_point]
def batch_pipeline(pipeline):
console_message = (
pipeline
| 'Get console\'s message from pub/sub' >> beam.io.ReadFromPubSub(
subscription='projects/production-213911/subscriptions/ps-to-bq-console',
with_attributes=True)
)
clear_message = console_message | beam.ParDo(GetClearMessage())
gcloud_id = console_message | beam.ParDo(GetGcloudId())
registry = console_message | beam.ParDo(GetTableData())
#clear_message | beam.ParDo(Test())
I removed some of the classes because it is not necessary to understand the issue.
When I run my pipeline on dataflow I get this error regulary:
Can't pickle <class 'common.v1.common_pb2.BatchEntryPoint'>: import of module 'common.v1.common_pb2' failed [while running 'ParDo(GetClearMessage)-ptransform-4468']
see full stack trace below.
Error message from worker: generic::unknown: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 156, in apache_beam.runners.worker.operations.ConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 184, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
File "apache_beam/runners/worker/opcounters.py", line 217, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
File "apache_beam/runners/worker/opcounters.py", line 255, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
File "apache_beam/coders/coder_impl.py", line 1277, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 1288, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 390, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 446, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
File "apache_beam/coders/coder_impl.py", line 257, in apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream
File "/usr/local/lib/python3.6/site-packages/apache_beam/coders/coders.py", line 764, in <lambda>
lambda x: dumps(x, protocol), pickle.loads)
_pickle.PicklingError: Can't pickle <class 'common.v1.common_pb2.BatchEntryPoint'>: import of module 'common.v1.common_pb2' failed
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
response = task()
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction
getattr(request, request_type), request.instruction_id)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1000, in process_bundle
element.data)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 228, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 158, in apache_beam.runners.worker.operations.ConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/usr/local/lib/python3.6/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
raise exc.with_traceback(traceback)
File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 156, in apache_beam.runners.worker.operations.ConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 184, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
File "apache_beam/runners/worker/opcounters.py", line 217, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
File "apache_beam/runners/worker/opcounters.py", line 255, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
File "apache_beam/coders/coder_impl.py", line 1277, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 1288, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 390, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 446, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
File "apache_beam/coders/coder_impl.py", line 257, in apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream
File "/usr/local/lib/python3.6/site-packages/apache_beam/coders/coders.py", line 764, in <lambda>
lambda x: dumps(x, protocol), pickle.loads)
_pickle.PicklingError: Can't pickle <class 'common.v1.common_pb2.BatchEntryPoint'>: import of module 'common.v1.common_pb2' failed [while running 'ParDo(GetClearMessage)-ptransform-314']
But as you may have seen in GetClearMessage I logged some data and when I look at my log for this specific step every thing seems fine event when I log batch_entry_point which is an instance of BatchEntryPoint, the class causing trouble.
Any ideas what's causing this behavior ?
EDIT
I tried to add a step on the result of ParDo(GetClearMessage) and this step is never reached. So I guess the pickling error is coming from the fact that I want to return an instance of BatchEntryPoint.
I do not understand this behavior, do you know how to fix it ?
Thanks