1

I have a pipeline that gets data from BigQuery and writes it to GCS, however, if I find any rejects I want to right them to a Bigquery table. I am collecting rejects into a global list variable and later loading the list into BigQuery table. This process works fine in when I run it locally as the pipelines were running in the right order. When I run it using dataflowrunner, it doesn't guarantee the order ( I want pipeline1 to run before pipeline2. Is there a way to have dependent pipelines in Dataflow using python? or Also please suggest if this can be solved in with better approach. Thanks in advance.

with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1:
 
    data = (pipeline1
               | 'get data' >> beam.io.Read(beam.io.BigQuerySource(query=...,use_standard_sql=True))
               | 'combine output to list' >> beam.combiners.ToList()
               | 'tranform' >> beam.Map(lambda x: somefunction)  # Collecting rejects in the except block of this method to a global list variable
               ....etc
               | 'to gcs' >> beam.io.WriteToText(output)
               )

# Loading the rejects gathered in the above pipeline to Biquery
with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline2:
    rejects = (pipeline2
                    | 'create pipeline' >> beam.Create(reject_list)
                    | 'to json format' >> beam.Map(lambda data: {.....})
                    | 'to bq' >> beam.io.WriteToBigQuery(....)
                    )
Bob
  • 335
  • 1
  • 4
  • 16
  • Does this answer your question? [Executing a pipeline only after another one finishes on google dataflow](https://stackoverflow.com/questions/49191548/executing-a-pipeline-only-after-another-one-finishes-on-google-dataflow) – rmesteves Sep 22 '20 at 15:04
  • @R.Esteves Thanks for the response. I did try using this - pipeline1.run().wait_until_finish(). It didn't work in Dataflow using python – Bob Sep 23 '20 at 02:28
  • Did you try using your first pCollection as a input for the second pipeline? – rmesteves Sep 23 '20 at 10:30
  • Are you suggesting something like this? I am getting assert isinstance(pbegin, pvalue.PBegin) AssertionError `with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1: data = (pipeline1 | 'get data' >> .... ) # Loading the rejects gathered in the above pipeline to Biquery with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline2: rejects = (data | 'create pipeline' >> beam.Create(reject_list) | ..... )` – Bob Sep 23 '20 at 15:37
  • Try putting both PCollections inside the same Pipeline like this :with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1: data = (pipeline1 | 'get data' >> .... ) # Loading the rejects gathered in the above pipeline to Biquery rejects = (data | 'create pipeline' >> beam.Create(reject_list) | ..... ) – rmesteves Sep 24 '20 at 15:13
  • Same error - assert isinstance(pbegin, pvalue.PBegin) – Bob Sep 24 '20 at 16:13
  • This is probably happening because your first PCollection is writing your data to a external place what makes it not available anymore. Try creating one pipeline with the structure below – rmesteves Sep 25 '20 at 09:30
  • with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1: data = (pipeline1 | 'get data' >> .... (until one step before writing it) ) rejects = (data | 'create pipeline' >> beam.Create(reject_list) | ...) write = (data | 'to gcs' >> beam.io.WriteToText(output) ) – rmesteves Sep 25 '20 at 09:31
  • I did exactly what you said above and got the same error :( – Bob Sep 27 '20 at 17:38
  • As a simple test, I had only below two lines in my program. It doesn't like taking pcollection as input and using something else as a data source. I got the same error. data = (pipeline1 | 'get data' >> beam.io.Read(beam.io.BigQuerySource(query='select ....') reject_list = ['abc','def','ghi'] rejects = (data | 'create pipeline' >> beam.Create(reject_list)) `INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner. Traceback (most recent call last): ... ..... assert isinstance(pbegin, pvalue.PBegin) AssertionError` – Bob Sep 28 '20 at 15:18

1 Answers1

3

You can do something like that, but with only 1 pipeline, and some additional code in the transformation.

The beam.Map(lambda x: somefunction) should have two outputs: the one that is written to GCS, and the rejected elements that will be eventually written to BigQuery.

For that, your transform function would have to return a TaggedOutput.

There is an example in the Beam Programming Guide: https://beam.apache.org/documentation/programming-guide/#multiple-outputs-dofn

The second PCollection, you can then write to BigQuery.

You don't need to have a Create in this second branch of the pipeline.

The pipeline would be something like this:

with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1:
 
    data = (pipeline1
               | 'get data' >> beam.io.Read(beam.io.BigQuerySource(query=...,use_standard_sql=True))
               | 'combine output to list' >> beam.combiners.ToList()
               | 'tranform' >> beam.Map(transform)  # Tagged output produced here

    pcoll_to_gcs = data.gcs_output
    pcoll_to_bq  = data.rejected

    pcoll_to_gcs | "to gcs" >> beam.io.WriteToText(output)
    pcoll_to_bq  | "to bq" >> beam.io.WriteToBigQuery(....)

Then the transform function would be something like this

def transform(element):
  if something_is_wrong_with_element:
    yield pvalue.TaggedOutput('rejected', element)

  transformed_element = ....

  yield pvalue.TaggedOutput('gcs_output', transformed_element)
Israel Herraiz
  • 611
  • 3
  • 8