I am trying to run a Dataflow pipeline in Python (3.9) via a FlexTemplate that runs a query in BigQuery (non-nested/non-repeated records) and writes the data to some database with beam_nuggets. My test database is Postgres. The data from BigQuery yields Python dictionaries as expected, but when I try to upload this to the database the pipeline fails.
Test data written to GCS from BigQuery.
{'order_id': 'CM-2011-110', 'order_date': '4/10/2011', 'ship_date': '10/10/2011', 'ship_mode': 'Standard Class', 'customer_name': 'Alejandro Grove', 'segment': 'Consumer', 'state': 'Est', 'country': 'Cameroon', 'market': 'Africa', 'region': 'Africa', 'product_id': 'OFF-CAR-10002031', 'category': 'Office Supplies', 'sub_category': 'Binders', 'product_name': 'Cardinal 3-Hole Punch, Durable', 'sales': 30, 'quantity': 1, 'discount': 0.0, 'profit': 13.92, 'shipping_cost': 2.57, 'order_priority': 'Medium', 'year': 2011}
{'order_id': 'CM-2011-110', 'order_date': '4/10/2011', 'ship_date': '10/10/2011', 'ship_mode': 'Standard Class', 'customer_name': 'Alejandro Grove', 'segment': 'Consumer', 'state': 'Est', 'country': 'Cameroon', 'market': 'Africa', 'region': 'Africa', 'product_id': 'TEC-CAN-10002879', 'category': 'Technology', 'sub_category': 'Copiers', 'product_name': 'Canon Copy Machine, High-Speed', 'sales': 521, 'quantity': 2, 'discount': 0.0, 'profit': 93.72, 'shipping_cost': 30.83, 'order_priority': 'Medium', 'year': 2011}
Current Code:
def run(save_main_session=True):
beam_options = PipelineOptions()
args = beam_options.view_as(MyOptions)
with beam.Pipeline(options=beam_options) as p:
db_source = db_reader(args.bqprojectid, args.dataset, args.table, args.limit)
db = db_writer(args.destinationip, args.port, args.destinationusername, args.destinationpassword, args.destinationtable, args.database_name)
result = (
p | beam.io.ReadFromBigQuery(use_standard_sql=True, query=db_source.sql_query()
)
# I would like to remove this step, but it helps with debugging.
|'Write to GCS' >> WriteToText('SOMEURI')
|'Write to Database' >> relational_db.Write(
source_config = (db.sink_config()),
table_config = (db.table_config())
))
I went through this answer: Write BigQuery results to GCS in CSV format using Apache Beam but was wondering if there was an easier way to transform the output from BigQuery to mirror this structure:
{'name': 'Jan', 'num': 1},
{'name': 'Feb', 'num': 2},
I was considering regex, but I do not thing that would be a best practice.
Please let me know if there is anything I can clarify.
Have a great day!
Error Message
File "/usr/local/lib/python3.9/site-packages/beam_nuggets/io/relational_db.py", line 181, in process
assert isinstance(element, dict)
I checked the credentials and realized that the structure of the data is the issue by creating a static p-collection via the beam nuggets docs. This uploaded successfully (months).
months = p | "Reading month records" >> beam.Create([
{'name': 'Jan', 'num': 1},
{'name': 'Feb', 'num': 2},
])
Resolution via answer provided below.
def map_to_beam_nuggets_data(element):
return {
'order_id': element['order_id'],
'order_date': element['order_date'],
'ship_date': element['ship_date'],
'ship_mode': element['ship_mode'],
'customer_name': element['customer_name'],
'segment': element['segment'],
'state': element['state'],
'country': element['country'],
'market': element['market'],
'region': element['region'],
'product_id': element['product_id'],
'category': element['category'],
'sub_category': element['sub_category'],
'product_name': element['product_name'],
'sales': element['sales'],
'quantity': element['quantity'],
'discount': element['discount'],
'profit': element['profit'],
'shipping_cost': element['shipping_cost'],
'order_priority': element['order_priority'],
}
# this is the controller function
def run(save_main_session=True):
beam_options = PipelineOptions()
args = beam_options.view_as(MyOptions)
with beam.Pipeline(options=beam_options) as p:
db_source = db_reader(args.bqprojectid, args.dataset, args.table, args.limit)
db = db_writer(args.destinationip, args.port, args.destinationusername, args.destinationpassword, args.destinationtable, args.database_name)
# Result from BigQuery.
result = (p | beam.io.ReadFromBigQuery(use_standard_sql=True, query=db_source.sql_query()))
# Sink result to GCS.
result | 'Write to GCS' >> WriteToText('SOMEURI')
# Map to result to Beam Nuggets data and Sink result to the database.
(result
| 'Map to beam nuggets data' >> beam.Map(map_to_beam_nuggets_data)
| 'Write to Database' >> relational_db.Write(
source_config=(db.sink_config()),
table_config=(db.table_config())
))
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()