1

I'm trying to pass a BigQuery table name as a value provider for a apache beam pipeline template. According to their documentation and this StackOverflow answer, it's possible to pass a value provider to apache_beam.io.gcp.bigquery.ReadFromBigQuery.

So this is the code for my pipeline

class UserOptions(PipelineOptions):
    """Define runtime argument"""
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--input', type=str)
        parser.add_value_provider_argument('--output', type=str)

pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
user_options = pipeline_options.view_as(UserOptions)

(p | 'Read from BQ Table' >> beam.io.gcp.bigquery.ReadFromBigQuery(
             user_options.input
         )

When I run the code locally, the command line passes the value for user_options.input is --input projectid.dataset_id.table

However, I had the error:

ValueError: A BigQuery table or a query must be specified

I tried:

  • Pass projectid:dataset_id.table

  • use bigquery.TableReference -> not possible

  • Use f'{user_options.input}'

  • Pass a query -> works when run locally but does not work when I call the template on GCP. Error statement:

    missing dataset while no default dataset is set in the request.", "errors": [ { "message": "Table name "RuntimeValueProvider(option: input, type: str, default_value: None)" missing dataset while no default dataset is set in the request.", "domain": "global", "reason": "invalid" } ], "status": "INVALID_ARGUMENT" } } >

What am I missing?

pa-nguyen
  • 417
  • 1
  • 5
  • 16

1 Answers1

1

The table argument must be passed by name to ReadFromBigQuery.

BigQuerySource (deprecated) accepts a table as the first argument so you can pass one in by position (docs). But ReadFromBigQuery expects the gcs_location as the first argument (docs). So if you are porting code from using BigQuerySource to using ReadFromBigQuery and you weren't explicitly passing the table in by name, it will fail with the error you received.

Here are two working examples and one that does not work:

import apache_beam as beam

project_id = 'my_project'
dataset_id = 'my_dataset'
table_id = 'my_table'

if __name__ == "__main__":
    args = [
        '--temp_location=gs://my_temp_bucket',
    ]
    # This works:
    with beam.Pipeline(argv=args) as pipeline:
        query_results = (
          pipeline
          | 'Read from BigQuery' 
          >> beam.io.ReadFromBigQuery(table=f"{project_id}:{dataset_id}.{table_id}")
        )
    # So does this:
    with beam.Pipeline(argv=args) as pipeline:
        query_results = (
          pipeline
          | 'Read from BigQuery' 
          >> beam.io.ReadFromBigQuery(table=f"{dataset_id}.{table_id}", project=project_id)
        )
    # But this doesn't work becuase the table argument is not passed in by name.
    # The f"{project_id}:{dataset_id}.{table_id}" string is interpreted as the gcs_location.
    with beam.Pipeline(argv=args) as pipeline:
        query_results = (
          pipeline
          | 'Read from BigQuery'
          >> beam.io.ReadFromBigQuery(f"{project_id}:{dataset_id}.{table_id}")
        )
Peter Boone
  • 1,193
  • 1
  • 12
  • 20