1

The problem

I'm writing a GCP cloud function that takes an input id from a pubsub message, process, and output the table to BigQuery.

The code is as followed:

from __future__ import absolute_import
import base64
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from scrapinghub import ScrapinghubClient
import os


def processing_data_function():
    # do stuff and return desired data 

def create_data_from_id():
    # take scrapinghub's job id and extract the data through api 

def run(event, context):
    """Triggered from a message on a Cloud Pub/Sub topic.
    Args:
         event (dict): Event payload.
         context (google.cloud.functions.Context): Metadata for the event.
    """
    # Take pubsub message and also Scrapinghub job's input id 
    pubsub_message = base64.b64decode(event['data']).decode('utf-8')  

    agrv = ['--project=project-name', 
            '--region=us-central1', 
            '--runner=DataflowRunner', 
            '--temp_location=gs://temp/location/', 
            '--staging_location=gs://staging/location/']
    p = beam.Pipeline(options=PipelineOptions(agrv))
    (p
        | 'Read from Scrapinghub' >> beam.Create(create_data_from_id(pubsub_message))
        | 'Trim b string' >> beam.FlatMap(processing_data_function)
        | 'Write Projects to BigQuery' >> beam.io.WriteToBigQuery(
                'table_name',
                schema=schema,
                # Creates the table in BigQuery if it does not yet exist.
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
    )
    p.run()


if __name__ == '__main__':
    run()

Note that 2 functions create_data_from_id and processing_data_function process data from Scrapinghub (a scraping site for scrapy) and they're quite lengthy so I don't want to include them here. They have nothing to do with the error as well since this code works if I run it from the cloud shell and pass arguments using argparse.ArgumentParser() instead.

Regarding the error I have, while there was no problem deploying the code and the pubsub message could trigger the function successfully, the data flow job failed and reported this error:

"Error message from worker: Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 279, in loads
    return dill.loads(s)
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 275, in loads
    return load(file, ignore, **kwds)
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 270, in load
    return Unpickler(file, ignore=ignore, **kwds).load()
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 472, in load
    obj = StockUnpickler.load(self)
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 826, in _import_module
    return __import__(import_name)
ModuleNotFoundError: No module named 'main'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 649, in do_work
    work_executor.execute()
  File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute
    op.start()
  File "apache_beam/runners/worker/operations.py", line 662, in apache_beam.runners.worker.operations.DoOperation.start
  File "apache_beam/runners/worker/operations.py", line 664, in apache_beam.runners.worker.operations.DoOperation.start
  File "apache_beam/runners/worker/operations.py", line 665, in apache_beam.runners.worker.operations.DoOperation.start
  File "apache_beam/runners/worker/operations.py", line 284, in apache_beam.runners.worker.operations.Operation.start
  File "apache_beam/runners/worker/operations.py", line 290, in apache_beam.runners.worker.operations.Operation.start
  File "apache_beam/runners/worker/operations.py", line 611, in apache_beam.runners.worker.operations.DoOperation.setup
  File "apache_beam/runners/worker/operations.py", line 616, in apache_beam.runners.worker.operations.DoOperation.setup
  File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 283, in loads
    return dill.loads(s)
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 275, in loads
    return load(file, ignore, **kwds)
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 270, in load
    return Unpickler(file, ignore=ignore, **kwds).load()
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 472, in load
    obj = StockUnpickler.load(self)
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 826, in _import_module
    return __import__(import_name)
ModuleNotFoundError: No module named 'main'

What I've tried

Given that I could run the same pipeline from the cloud shell but using the argument parser instead of specifying the options, I thought that the way the options stated were the problem. Hence, I tried different combinations of the options, which were with or without --save_main_session, --staging_location, --requirement_file=requirements.txt, --setup_file=setup.py ... They all reported more-or-less the same issue, all with dill don't know what module to pick up. With save_main_session specified, the main session couldn't be picked up. With requirement_file and setup_file specified the job was not even successfully created so I would save you the trouble of looking into its error. My main problem is I don't know where this problem came from because I've never used dill before and why is it so different running the pipeline from shell and from cloud functions? Does anybody have a clue?

Thanks

pa-nguyen
  • 417
  • 1
  • 5
  • 16

2 Answers2

0

You may also try modifying the last part as and test if the following works:

if __name__ == "__main__":
    ...

Additionally, make sure you are executing the script in the correct folder, as it might have to do with the naming or the location of your files in your directories.

Please take into consideration the following sources, which you may find helpful: Source 1, Source 2

I hope this information helps.

  • 1
    The error occured when I called a deployed cloud function on GCP, not when I ran the script with the command line, so I don't think its because of the location of my files. The cloud function also has the typical structure of normal cloud function on GCP – pa-nguyen Sep 23 '20 at 13:46
  • Are you still facing the issue? – Artemis Georgakopoulou Oct 07 '20 at 13:45
0

You're maybe using gunicorn to start the app on Cloud Run (as a standard practice) like:

CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 main:app

I faced with the same problem, and found a workaround as to start the app without gunicorn:

CMD exec python3 main.py

Probably, it's because gunicorn skips the main context and directly launch the main:app object. I don't know how to fix it with using gunicorn.

=== Additional note ===

I found a way to use gunicorn.

  1. Move a function (that starts a pipeline) to another module such as df_pipeline/pipe.py.
.
├── df_pipeline
│   ├── __init__.py
│   └── pipe.py
├── Dockerfile
├── main.py
├── requirements.txt
└── setup.py
# in main.py
import df_pipeline as pipe
result = pipe.preprocess(....)
  1. Create setup.py in the same directory as main.py
# setup.py
import setuptools
setuptools.setup(
    name='df_pipeline',
    install_requires=[],
    packages=setuptools.find_packages(include=['df_pipeline']),
)
  1. Set the pipeline option setup_file as ./setup.py in df_pipeline/pipe.py.
etusji
  • 138
  • 3