2

I have a pipeline as follow:

import base64
import gzip
import logging
import apache_beam as beam

import data.build.common.v1.common_pb2 as common_pb2
from data.pipeline.steps.console_proto_list import CONSOLE_PROTO_LIST
from google.protobuf.message import DecodeError

class GetClearMessage(beam.DoFn):
    def process(self, element, **kwargs):
        """ Parse encoded proto 
        Returns an instance of EntryPoint decoded.
        """
        logging.info('Unserializing data')
        logging.info(element)
        batch_entry_point = common_pb2.BatchEntryPoint()
        data = element.data
        logging.info(data)
        try:
            batch_entry_point.ParseFromString(data)
        except DecodeError:
            unziped = gzip.decompress(data)
            batch_entry_point.ParseFromString(unziped)
        logging.info(batch_entry_point)
        return [batch_entry_point]

def batch_pipeline(pipeline):
    console_message = (
            pipeline
            | 'Get console\'s message from pub/sub' >> beam.io.ReadFromPubSub(
        subscription='projects/production-213911/subscriptions/ps-to-bq-console',
        with_attributes=True)
    )

    clear_message = console_message | beam.ParDo(GetClearMessage())
    gcloud_id = console_message | beam.ParDo(GetGcloudId())
    registry = console_message | beam.ParDo(GetTableData())
    #clear_message | beam.ParDo(Test())

I removed some of the classes because it is not necessary to understand the issue.

When I run my pipeline on dataflow I get this error regulary:

Can't pickle <class 'common.v1.common_pb2.BatchEntryPoint'>: import of module 'common.v1.common_pb2' failed [while running 'ParDo(GetClearMessage)-ptransform-4468']

see full stack trace below.

    Error message from worker: generic::unknown: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 156, in apache_beam.runners.worker.operations.ConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 184, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
  File "apache_beam/runners/worker/opcounters.py", line 217, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
  File "apache_beam/runners/worker/opcounters.py", line 255, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
  File "apache_beam/coders/coder_impl.py", line 1277, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 1288, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 390, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 446, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
  File "apache_beam/coders/coder_impl.py", line 257, in apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream
  File "/usr/local/lib/python3.6/site-packages/apache_beam/coders/coders.py", line 764, in <lambda>
    lambda x: dumps(x, protocol), pickle.loads)
_pickle.PicklingError: Can't pickle <class 'common.v1.common_pb2.BatchEntryPoint'>: import of module 'common.v1.common_pb2' failed

During handling of the above exception, another exception occurred:

    Traceback (most recent call last):
      File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
        response = task()
      File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda>
        lambda: self.create_worker().do_instruction(request), request)
      File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction
        getattr(request, request_type), request.instruction_id)
      File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle
        bundle_processor.process_bundle(instruction_id))
      File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1000, in process_bundle
        element.data)
      File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 228, in process_encoded
        self.output(decoded_value)
      File "apache_beam/runners/worker/operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output
      File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
      File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
      File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
      File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
      File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
      File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
      File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
      File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
      File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
      File "apache_beam/runners/worker/operations.py", line 158, in apache_beam.runners.worker.operations.ConsumerSet.receive
      File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
      File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
      File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
      File "apache_beam/runners/common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
      File "/usr/local/lib/python3.6/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
        raise exc.with_traceback(traceback)
      File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
      File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
      File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
      File "apache_beam/runners/worker/operations.py", line 156, in apache_beam.runners.worker.operations.ConsumerSet.receive
      File "apache_beam/runners/worker/operations.py", line 184, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
      File "apache_beam/runners/worker/opcounters.py", line 217, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
      File "apache_beam/runners/worker/opcounters.py", line 255, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
      File "apache_beam/coders/coder_impl.py", line 1277, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
      File "apache_beam/coders/coder_impl.py", line 1288, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
      File "apache_beam/coders/coder_impl.py", line 390, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.get_estimated_size_and_observables
      File "apache_beam/coders/coder_impl.py", line 446, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
      File "apache_beam/coders/coder_impl.py", line 257, in apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream
      File "/usr/local/lib/python3.6/site-packages/apache_beam/coders/coders.py", line 764, in <lambda>
        lambda x: dumps(x, protocol), pickle.loads)
    _pickle.PicklingError: Can't pickle <class 'common.v1.common_pb2.BatchEntryPoint'>: import of module 'common.v1.common_pb2' failed [while running 'ParDo(GetClearMessage)-ptransform-314']

But as you may have seen in GetClearMessage I logged some data and when I look at my log for this specific step every thing seems fine event when I log batch_entry_point which is an instance of BatchEntryPoint, the class causing trouble.

Any ideas what's causing this behavior ?


EDIT

I tried to add a step on the result of ParDo(GetClearMessage) and this step is never reached. So I guess the pickling error is coming from the fact that I want to return an instance of BatchEntryPoint.

I do not understand this behavior, do you know how to fix it ?
Thanks

Kimor
  • 532
  • 4
  • 17
  • The error is: `Can't pickle : import of module 'common.v1.common_pb2' failed`. In your script, you have `import data.build.common.v1.common_pb2 as common_pb2`. But in the stack trace, the class is `common.v1.common_pb2.BatchEntryPoint`. It seems to be a python issue: https://stackoverflow.com/questions/42853617/python-fails-importing-package. – ningk Apr 28 '21 at 20:44
  • Isn't it because the problem occurs when I want to return an instance of BatchEntryPoint and not when I import the common_pb2 module ? – Kimor Apr 29 '21 at 07:48
  • I think it has something to do with how you organize the modules and import them. See an answer from this thread: https://stackoverflow.com/questions/6050391/pickle-cant-import-a-module-that-exists#:~:text=Pickle%20depends%20upon%20the%20module,if%20you%20do%20import%20module and this thread: https://stackoverflow.com/questions/2121874/python-pickling-after-changing-a-modules-directory – ningk Apr 29 '21 at 18:50

1 Answers1

0

I did not fix the issue, but I found a work arounf by not returning batch_entry_point but each element in it like this:

        for i in batch_entry_point.entrypoints:
        logging.info(i)
        obj['proto'] = i
        yield obj

Then, each element is treated by the pipeline's next step

Kimor
  • 532
  • 4
  • 17