The problem
I'm writing a GCP cloud function that takes an input id from a pubsub message, process, and output the table to BigQuery.
The code is as followed:
from __future__ import absolute_import
import base64
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from scrapinghub import ScrapinghubClient
import os
def processing_data_function():
# do stuff and return desired data
def create_data_from_id():
# take scrapinghub's job id and extract the data through api
def run(event, context):
"""Triggered from a message on a Cloud Pub/Sub topic.
Args:
event (dict): Event payload.
context (google.cloud.functions.Context): Metadata for the event.
"""
# Take pubsub message and also Scrapinghub job's input id
pubsub_message = base64.b64decode(event['data']).decode('utf-8')
agrv = ['--project=project-name',
'--region=us-central1',
'--runner=DataflowRunner',
'--temp_location=gs://temp/location/',
'--staging_location=gs://staging/location/']
p = beam.Pipeline(options=PipelineOptions(agrv))
(p
| 'Read from Scrapinghub' >> beam.Create(create_data_from_id(pubsub_message))
| 'Trim b string' >> beam.FlatMap(processing_data_function)
| 'Write Projects to BigQuery' >> beam.io.WriteToBigQuery(
'table_name',
schema=schema,
# Creates the table in BigQuery if it does not yet exist.
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
run()
Note that 2 functions create_data_from_id
and processing_data_function
process data from Scrapinghub (a scraping site for scrapy) and they're quite lengthy so I don't want to include them here. They have nothing to do with the error as well since this code works if I run it from the cloud shell and pass arguments using argparse.ArgumentParser()
instead.
Regarding the error I have, while there was no problem deploying the code and the pubsub message could trigger the function successfully, the data flow job failed and reported this error:
"Error message from worker: Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 279, in loads
return dill.loads(s)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 275, in loads
return load(file, ignore, **kwds)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 270, in load
return Unpickler(file, ignore=ignore, **kwds).load()
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 472, in load
obj = StockUnpickler.load(self)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 826, in _import_module
return __import__(import_name)
ModuleNotFoundError: No module named 'main'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 649, in do_work
work_executor.execute()
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute
op.start()
File "apache_beam/runners/worker/operations.py", line 662, in apache_beam.runners.worker.operations.DoOperation.start
File "apache_beam/runners/worker/operations.py", line 664, in apache_beam.runners.worker.operations.DoOperation.start
File "apache_beam/runners/worker/operations.py", line 665, in apache_beam.runners.worker.operations.DoOperation.start
File "apache_beam/runners/worker/operations.py", line 284, in apache_beam.runners.worker.operations.Operation.start
File "apache_beam/runners/worker/operations.py", line 290, in apache_beam.runners.worker.operations.Operation.start
File "apache_beam/runners/worker/operations.py", line 611, in apache_beam.runners.worker.operations.DoOperation.setup
File "apache_beam/runners/worker/operations.py", line 616, in apache_beam.runners.worker.operations.DoOperation.setup
File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 283, in loads
return dill.loads(s)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 275, in loads
return load(file, ignore, **kwds)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 270, in load
return Unpickler(file, ignore=ignore, **kwds).load()
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 472, in load
obj = StockUnpickler.load(self)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 826, in _import_module
return __import__(import_name)
ModuleNotFoundError: No module named 'main'
What I've tried
Given that I could run the same pipeline from the cloud shell but using the argument parser instead of specifying the options, I thought that the way the options stated were the problem. Hence, I tried different combinations of the options, which were with or without --save_main_session
, --staging_location
, --requirement_file=requirements.txt
, --setup_file=setup.py
... They all reported more-or-less the same issue, all with dill don't know what module to pick up. With save_main_session
specified, the main session couldn't be picked up. With requirement_file and setup_file specified the job was not even successfully created so I would save you the trouble of looking into its error. My main problem is I don't know where this problem came from because I've never used dill before and why is it so different running the pipeline from shell and from cloud functions? Does anybody have a clue?
Thanks