I'm building an Apache Beam pipeline using GCP Dataflow to process incoming events which need to be written in separate BigQuery tables depending on the content of the event. The decision of which table the data needs to be written to happens in one of the stages of the pipeline. My problem is how do I dynamicallys ett he name of the table that the data needs to go into. Also, in some cases, data needs to be written to two tables, after applying a transform.
I have gone through the solutions posted on these links, but it seems that they could be for old versions of google-cloud/apache-beam and are not working for me:
- Dynamically set bigquery table id in dataflow pipeline
- Writing different values to different BigQuery tables in Apache Beam
Attaching a sample pipeline using DirectRunner where I tried to follow the 2nd link mentioned above:
#Standard Python Imports
import argparse
import logging
import json
#3rd Party Imports
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.options.pipeline_options import PipelineOptions
def transform_entry(line):
return json.loads(line)
def getTableName(entry):
if (entry["tablename"] == "table1"):
return "table1"
else:
return "table2"
def getRow(entry):
return entry["dataRow"]
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--temp_location',
default='<<TEMPORARY LOCATION>>')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(StandardOptions).streaming = True
with beam.Pipeline(options=pipeline_options) as p:
writeData = (p
| 'ReadInput' >> beam.io.ReadFromText('./sample_input.json')
| 'Parse' >> beam.Map(transform_entry))
eventRow = (writeData
| 'Get Data Row' >> beam.map(getRow)
| 'Write Event Row' >> beam.io.gcp.bigquery.WriteToBigQuery(
project='<<GCP PROJECT>>',
dataset='<<DATASET NAME>>',
table=getTableName,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
))
print(eventRow)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.ERROR)
run()
Could someone please help me out with this?
Attaching the traceback here:
/home/animesh/.local/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery.py:1992: BeamDeprecationWarning: options is deprecated since First stable release. References to <pipeline>.options will not be supported
is_streaming_pipeline = p.options.view_as(StandardOptions).streaming
<apache_beam.io.gcp.bigquery.WriteResult object at 0x7f2421660100>
Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 623, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1571, in apache_beam.runners.common._OutputHandler.handle_process_outputs
File "/home/animesh/.local/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1521, in process
yield (self.destination(element, *side_inputs), element)
File "/home/animesh/Documents/cliqmetrics/logger/dataflow-pipeline/stackques/stackpipe.py", line 85, in getTableName
KeyError: 'tablename'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/animesh/Documents/cliqmetrics/logger/dataflow-pipeline/stackques/stackpipe.py", line 56, in <module>
run()
File "/home/animesh/Documents/cliqmetrics/logger/dataflow-pipeline/stackques/stackpipe.py", line 37, in run
with beam.Pipeline(options=pipeline_options) as p:
File "/home/animesh/.local/lib/python3.10/site-packages/apache_beam/pipeline.py", line 600, in __exit__
self.result = self.run()
File "/home/animesh/.local/lib/python3.10/site-packages/apache_beam/pipeline.py", line 553, in run
self._options).run(False)
File "/home/animesh/.local/lib/python3.10/site-packages/apache_beam/pipeline.py", line 577, in run
return self.runner.run_pipeline(self, self._options)
File "/home/animesh/.local/lib/python3.10/site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline
return runner.run_pipeline(pipeline, options)
File "/home/animesh/.local/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 201, in run_pipeline
self._latest_run_result = self.run_via_runner_api(
File "/home/animesh/.local/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 222, in run_via_runner_api
return self.run_stages(stage_context, stages)
File "/home/animesh/.local/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 453, in run_stages
bundle_results = self._execute_bundle(
File "/home/animesh/.local/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 781, in _execute_bundle
self._run_bundle(
File "/home/animesh/.local/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1010, in _run_bundle
result, splits = bundle_manager.process_bundle(
File "/home/animesh/.local/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1346, in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
File "/home/animesh/.local/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 379, in push
response = self.worker.do_instruction(request)
File "/home/animesh/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 596, in do_instruction
return getattr(self, request_type)(
File "/home/animesh/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 634, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/home/animesh/.local/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1003, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File "/home/animesh/.local/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 227, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 526, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 528, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 237, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 1021, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
File "apache_beam/runners/worker/operations.py", line 1030, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
File "apache_beam/runners/common.py", line 1432, in apache_beam.runners.common.DoFnRunner.process_with_sized_restriction
File "apache_beam/runners/common.py", line 817, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 981, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 623, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 623, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1507, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 623, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1571, in apache_beam.runners.common._OutputHandler.handle_process_outputs
File "/home/animesh/.local/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1521, in process
yield (self.destination(element, *side_inputs), element)
File "/home/animesh/Documents/cliqmetrics/logger/dataflow-pipeline/stackques/stackpipe.py", line 85, in getTableName
KeyError: "tablename [while running 'Write Event Row/_StreamToBigQuery/AppendDestination']"