I'd like to embed my dataflow inside a Cloud Function WITHOUT USING TEMPLATE. I ran into an error at first and according to this answer, I should packaging up my code as a dependency. This is the structure of my Cloud Function:
file wb_flow.py
def main(identifier, schema_file):
"""The main function which creates the pipeline and runs it."""
table_name = f"wijken_en_buurten_{cbsodata.get_info(identifier)['Period']}"
pipeline_options = PipelineOptions(
[
'--runner', 'DataflowRunner',
'--project', 'veneficus',
'--region', 'europe-west4',
'--temp_location', 'gs://vf_etl/test',
'--staging_location', 'gs://vf_etl/temp',
'--setup_file', 'setup.py'
]
)
p = beam.Pipeline(options=pipeline_options)
(p
| 'Read from BQ Table' >> beam.Create(cbsodata.get_data(identifier))
| 'Write Projects to BigQuery' >> beam.io.WriteToBigQuery(
f"cbs.{table_name}",
schema=schema,
# Creates the table in BigQuery if it does not yet exist.
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
)
p.run()#.wait_until_finish()
file main.py
:
import base64
from wb_flow import main
def run(event, context):
"""The main function which creates the pipeline and runs it."""
message = base64.b64decode(event['data']).decode('utf-8').split(',')
identifier, schema_file = message[0], message[1]
main(identifier, schema_file)
and setup.py
:
import setuptools
setuptools.setup(
name='wb_flow',
version='1.0.0',
install_requires=[],
packages=setuptools.find_packages(),
)
I got this error in the construction of the dataflow
File "/layers/google.python.pip/pip/lib/python3.8/site-packages/apache_beam/runners/portability/stager.py", line 579, in _build_setup_package
os.chdir(os.path.dirname(setup_file))
FileNotFoundError: [Errno 2] No such file or directory: ''
And I believe it means that it couldnt find my setup.py
. How can I specify the path to my setup file?
Alternative, I tried to do this without setup.py
, and the Dataflow said it couldnt find wb_flow module
Update
When I specified my setup path as /workspace/setup.py
, I have this error
subprocess.CalledProcessError: Command '['/layers/google.python.pip/pip/bin/python3', 'setup.py', 'sdist', '--dist-dir', '/tmp/tmp3rxejr4g']' returned non-zero exit status 1."