0

I have a simple google could http trigger function which is responsible for triggering Dataflow runner job that loads data from CSV on Cloud Storage to a BigQuery table.

My code looks is given below:-

import apache_beam as beam
import argparse
from apache_beam.options.pipeline_options import SetupOptions, PipelineOptions

PROJECT = 'proj'
BUCKET='BUCKET'
SCHEMA = 'sr:INTEGER,abv:FLOAT,id:INTEGER,name:STRING,style:STRING,ounces:FLOAT,ibu:STRING,brewery_id:STRING'
DATAFLOW_JOB_NAME = 'jobname'


def execute(request):
    argv = [
      '--project={0}'.format(PROJECT),
      '--job_name={0}'.format(DATAFLOW_JOB_NAME),
      '--staging_location=gs://{0}/staging/'.format(BUCKET),
      '--temp_location=gs://{0}/staging/'.format(BUCKET),
      '--region=europe-west2',
      '--runner=DataflowRunner'
   ]

    #p = beam.Pipeline(argv=argv)
    pipeline_options = PipelineOptions(argv)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    p = beam.Pipeline(options=pipeline_options)
    input = 'gs://{0}/beers.csv'.format(BUCKET)
    print ('step-222')

    (p | 'ReadData' >> beam.io.ReadFromText(input, skip_header_lines =1)
       | 'SplitData' >> beam.Map(lambda x: x.split(','))
       | 'FormatToDict' >> beam.Map(lambda x: {"sr": x[0], "abv": x[1], "ibu": x[2], "id": x[3], "name": x[4], "style": x[5], "brewery_id": x[6], "ounces": x[7]}) 
       | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
           table='data',
           dataset='sandbox',
           project=PROJECT
           schema=SCHEMA,
           create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
           write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
           ))
    p.run()
    return "success"

Function runs successfully and it also creates a Dataflow instance, but Dataflow instance fails with in 40 seconds without creating Graph-View. enter image description here It is giving error :- enter image description here

Vibhor Gupta
  • 670
  • 7
  • 16

1 Answers1

3

As @captainnabla said in his comment, you have to create a subnetwork and give it as option to your Dataflow job.

  • Solution 1

In the default VPC of the project, create the subnetwork for Dataflow

If you didn’t specified the subnetwork, usually the project default VPC network will be used by the Dataflow job. I don’t know why this didn’t worked in your case (maybe in this case, the default network taken by the job is outside of the project executing the job).

  • Solution 2

Create another VPC for your data pipelines and a subnetwork for Dataflow

The network config depends on your team strategy.

In the two solutions, you can pass the subnetwork as program argument to your Dataflow job :

--subnetwork=https://www.googleapis.com/compute/v1/projects/{PROJECT_ID}/regions/{REGION}/subnetworks/{SUBNETWORK}
Mazlum Tosun
  • 5,761
  • 1
  • 9
  • 23
  • 1
    Correct! Thanks Mazlum. Just wanted to add that "default" is the default network used in case it is not specified (and should exist by default in most projects), so somehow it doesn't exist anymore. – Bruno Volpato Jan 03 '23 at 18:07
  • You’re welcome Bruno, you are right and thanks for these details. – Mazlum Tosun Jan 03 '23 at 18:13