1

(Continued from a previous question)

I am trying to deploy a google dataflow job to run it as a cron job on the google app engine, following the method described here.

I have an DataFlow script (written in python) in a pipelines/script.py folder. Running this script locally (using the Apache Beam DirectRunner) or on google cloud (using the DataFlowRunner) works properly. But when deploying the job to run it periodically on the app engine, the job raises the following error when executed:

(4cb822d7f796239a): Traceback (most recent call last):   File
"/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py",
line 582, in do_work
    work_executor.execute()   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py",
line 166, in execute
    op.start()   File "apache_beam/runners/worker/operations.py", line 294, in apache_beam.runners.worker.operations.DoOperation.start
(apache_beam/runners/worker/operations.c:10607)
    def start(self):   File "apache_beam/runners/worker/operations.py", line 295, in
apache_beam.runners.worker.operations.DoOperation.start
(apache_beam/runners/worker/operations.c:10501)
    with self.scoped_start_state:   File "apache_beam/runners/worker/operations.py", line 300, in
apache_beam.runners.worker.operations.DoOperation.start
(apache_beam/runners/worker/operations.c:9702)
    pickler.loads(self.spec.serialized_fn))   File "/usr/local/lib/python2.7/dist-
packages/apache_beam/internal/pickler.py", line 225, in loads
    return dill.loads(s)   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in
loads
    return load(file)   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in
load
    obj = pik.load()   File "/usr/lib/python2.7/pickle.py", line 858, in load
    dispatch[key](self)   File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
    klass = self.find_class(module, name)   File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in
find_class
    return StockUnpickler.find_class(self, module, name)   File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
    __import__(module) ImportError: No module named pipelines.spanner_backup

This is the stack trace visible when directly accessing the job in the dataflow panel of the google cloud console. However, if I click on "Stack Traces" to see the error stack trace from the "Stackdriver Error Reporting" panel, I see the following trace:

Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 738, in run
    work, execution_context, env=self.environment)
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/workitem.py", line 130, in get_work_items
    work_item_proto.sourceOperationTask.split)
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/workercustomsources.py", line 142, in __init__
    source_spec[names.SERIALIZED_SOURCE_KEY]['value'])
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 225, in loads
    return dill.loads(s)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 277, in loads
    return load(file)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 266, in load
    obj = pik.load()
  File "/usr/lib/python2.7/pickle.py", line 858, in load
    dispatch[key](self)
  File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
    klass = self.find_class(module, name)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 423, in find_class
    return StockUnpickler.find_class(self, module, name)
  File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
    __import__(module)
ImportError: No module named spanner.client

Suggesting some import error when sharing things between workers? Google Spanner should be properly installed though.

I am using:

Flask==0.12.2 
apache-beam[gcp]==2.1.1 
gunicorn==19.7.1 
gevent==1.2.1
google-cloud-dataflow==2.1.1 
google-cloud-spanner==0.26

Am I missing something ?

Edit: My setup.py is the following: (as described here, corresponding github link with comments here)

from distutils.command.build import build as _build
import subprocess
import setuptools

class build(_build):  # pylint: disable=invalid-name
  sub_commands = _build.sub_commands + [('CustomCommands', None)]

CUSTOM_COMMANDS = [
    ['echo', 'Custom command worked!']]


class CustomCommands(setuptools.Command):
  """A setuptools Command class able to run arbitrary commands."""

  def initialize_options(self):
    pass

  def finalize_options(self):
    pass

  def RunCustomCommand(self, command_list):
    print 'Running command: %s' % command_list
    p = subprocess.Popen(
        command_list,
        stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    # Can use communicate(input='y\n'.encode()) if the command run requires
    # some confirmation.
    stdout_data, _ = p.communicate()
    print 'Command output: %s' % stdout_data
    if p.returncode != 0:
      raise RuntimeError(
          'Command %s failed: exit code: %s' % (command_list, p.returncode))

  def run(self):
    for command in CUSTOM_COMMANDS:
      self.RunCustomCommand(command)

REQUIRED_PACKAGES = ["Flask==0.12.2",
                        "apache-beam[gcp]==2.1.1",
                        "gunicorn==19.7.1",
                        "gevent==1.2.1",
                        "google-cloud-dataflow==2.1.1",
                        "google-cloud-spanner==0.26"
                    ]

setuptools.setup(
    name='dataflow_python_pipeline',
    version='1.0.0',
    description='DataFlow Python Pipeline',
    install_requires=REQUIRED_PACKAGES,
    packages=setuptools.find_packages(),
    cmdclass={
        'build': build,
        'CustomCommands': CustomCommands,
        }
    )
Pascal Delange
  • 417
  • 5
  • 17
  • Do you have ```--save_main_session``` in pipeline options? If yes, try to remove it – Marcin Zablocki Oct 18 '17 at 10:25
  • Thanks for the reformatting. I do, it was needed to make the job run with the DataflowRunner when submitting the job from my computer. Removing it results in the same error, though. – Pascal Delange Oct 18 '17 at 12:20
  • OK, please add contents of your ```setup.py``` too. – Marcin Zablocki Oct 18 '17 at 12:44
  • I added it. Am I supposed to add "pip instal ***" for all modules I need on the workers in "CUSTOM_COMMAND" in the setup.py file ? – Pascal Delange Oct 18 '17 at 12:59
  • Either this or you could try to fill ```REQUIRED_PACKAGES``` with your modules, like this: ```REQUIRED_PACKAGES=["google-cloud-spanner==0.26", "another-module==1.0"]``` etc... – Marcin Zablocki Oct 18 '17 at 13:28
  • I have the same error when doing this (that being said, I should definitely be importing the modules like this). Note however that import error I get does not concern an external module, but my own pipeline "spanner_backup", which I put in the folder "pipelines". – Pascal Delange Oct 18 '17 at 14:10
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/157007/discussion-between-marcin-zablocki-and-pascal-delange). – Marcin Zablocki Oct 18 '17 at 14:12

1 Answers1

0

Here is the solution to my problem, for the record. Thanks to Marcin Zabloki for helping me out.

It appears I was not properly linking the setup file to the pipeline. By replacing

pipeline_options = PipelineOptions()
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(SetupOptions).requirements_file = "requirements.txt"
google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
google_cloud_options.project = PROJECT_ID
google_cloud_options.job_name = JOB_NAME
google_cloud_options.staging_location = '%s/staging' % BUCKET_URL
google_cloud_options.temp_location = '%s/tmp' % BUCKET_URL
pipeline_options.view_as(StandardOptions).runner = 'DataflowRunner'

by

pipeline_options = PipelineOptions()
pipeline_options.view_as(SetupOptions).setup_file = "./setup.py"
google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
google_cloud_options.project = PROJECT_ID
google_cloud_options.job_name = JOB_NAME
google_cloud_options.staging_location = '%s/staging' % BUCKET_URL
google_cloud_options.temp_location = '%s/tmp' % BUCKET_URL
pipeline_options.view_as(StandardOptions).runner = 'DataflowRunner'

(and adding the modules to install in the setup.py file rather than in requirements.txt) as well as loading the modules I use locally in the ParDos rather than at the head of the file, I was able to deploy the script.

Not doing so seems to lead to some strange, undefined behavior (such as as function not finding classes defined in the same file), rather than clear error messages.

Pascal Delange
  • 417
  • 5
  • 17