I have a simple google could http trigger function which is responsible for triggering Dataflow runner job that loads data from CSV on Cloud Storage to a BigQuery table.
My code looks is given below:-
import apache_beam as beam
import argparse
from apache_beam.options.pipeline_options import SetupOptions, PipelineOptions
PROJECT = 'proj'
BUCKET='BUCKET'
SCHEMA = 'sr:INTEGER,abv:FLOAT,id:INTEGER,name:STRING,style:STRING,ounces:FLOAT,ibu:STRING,brewery_id:STRING'
DATAFLOW_JOB_NAME = 'jobname'
def execute(request):
argv = [
'--project={0}'.format(PROJECT),
'--job_name={0}'.format(DATAFLOW_JOB_NAME),
'--staging_location=gs://{0}/staging/'.format(BUCKET),
'--temp_location=gs://{0}/staging/'.format(BUCKET),
'--region=europe-west2',
'--runner=DataflowRunner'
]
#p = beam.Pipeline(argv=argv)
pipeline_options = PipelineOptions(argv)
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
input = 'gs://{0}/beers.csv'.format(BUCKET)
print ('step-222')
(p | 'ReadData' >> beam.io.ReadFromText(input, skip_header_lines =1)
| 'SplitData' >> beam.Map(lambda x: x.split(','))
| 'FormatToDict' >> beam.Map(lambda x: {"sr": x[0], "abv": x[1], "ibu": x[2], "id": x[3], "name": x[4], "style": x[5], "brewery_id": x[6], "ounces": x[7]})
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
table='data',
dataset='sandbox',
project=PROJECT
schema=SCHEMA,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
))
p.run()
return "success"
Function runs successfully and it also creates a Dataflow instance, but Dataflow instance fails with in 40 seconds without creating Graph-View.
It is giving error :-