1

I am trying to run a Dataflow pipeline in Python (3.9) via a FlexTemplate that runs a query in BigQuery (non-nested/non-repeated records) and writes the data to some database with beam_nuggets. My test database is Postgres. The data from BigQuery yields Python dictionaries as expected, but when I try to upload this to the database the pipeline fails.

Test data written to GCS from BigQuery.

{'order_id': 'CM-2011-110', 'order_date': '4/10/2011', 'ship_date': '10/10/2011', 'ship_mode': 'Standard Class', 'customer_name': 'Alejandro Grove', 'segment': 'Consumer', 'state': 'Est', 'country': 'Cameroon', 'market': 'Africa', 'region': 'Africa', 'product_id': 'OFF-CAR-10002031', 'category': 'Office Supplies', 'sub_category': 'Binders', 'product_name': 'Cardinal 3-Hole Punch, Durable', 'sales': 30, 'quantity': 1, 'discount': 0.0, 'profit': 13.92, 'shipping_cost': 2.57, 'order_priority': 'Medium', 'year': 2011}
{'order_id': 'CM-2011-110', 'order_date': '4/10/2011', 'ship_date': '10/10/2011', 'ship_mode': 'Standard Class', 'customer_name': 'Alejandro Grove', 'segment': 'Consumer', 'state': 'Est', 'country': 'Cameroon', 'market': 'Africa', 'region': 'Africa', 'product_id': 'TEC-CAN-10002879', 'category': 'Technology', 'sub_category': 'Copiers', 'product_name': 'Canon Copy Machine, High-Speed', 'sales': 521, 'quantity': 2, 'discount': 0.0, 'profit': 93.72, 'shipping_cost': 30.83, 'order_priority': 'Medium', 'year': 2011}

Current Code:

def run(save_main_session=True):
    beam_options = PipelineOptions()
    args = beam_options.view_as(MyOptions)
    with beam.Pipeline(options=beam_options) as p:
        db_source = db_reader(args.bqprojectid, args.dataset, args.table, args.limit)
        db = db_writer(args.destinationip, args.port, args.destinationusername, args.destinationpassword, args.destinationtable, args.database_name)
        result = (
            p | beam.io.ReadFromBigQuery(use_standard_sql=True, query=db_source.sql_query()
                )
               # I would like to remove this step, but it helps with debugging. 
              |'Write to GCS' >> WriteToText('SOMEURI')
              |'Write to Database' >> relational_db.Write(
                  source_config = (db.sink_config()),
                  table_config = (db.table_config())
              ))

I went through this answer: Write BigQuery results to GCS in CSV format using Apache Beam but was wondering if there was an easier way to transform the output from BigQuery to mirror this structure:

{'name': 'Jan', 'num': 1},
{'name': 'Feb', 'num': 2},

I was considering regex, but I do not thing that would be a best practice.

Please let me know if there is anything I can clarify.

Have a great day!

Error Message

 File "/usr/local/lib/python3.9/site-packages/beam_nuggets/io/relational_db.py", line 181, in process
    assert isinstance(element, dict)

I checked the credentials and realized that the structure of the data is the issue by creating a static p-collection via the beam nuggets docs. This uploaded successfully (months).

months = p | "Reading month records" >> beam.Create([
        {'name': 'Jan', 'num': 1},
        {'name': 'Feb', 'num': 2},
    ])

Resolution via answer provided below.

def map_to_beam_nuggets_data(element):
    return {
        'order_id': element['order_id'],
        'order_date': element['order_date'],
        'ship_date': element['ship_date'],
        'ship_mode': element['ship_mode'],
        'customer_name': element['customer_name'],
        'segment': element['segment'],
        'state': element['state'],
        'country': element['country'],
        'market': element['market'],
        'region': element['region'],
        'product_id': element['product_id'],
        'category': element['category'],
        'sub_category': element['sub_category'],
        'product_name': element['product_name'],
        'sales': element['sales'],
        'quantity': element['quantity'],
        'discount': element['discount'],
        'profit': element['profit'],
        'shipping_cost': element['shipping_cost'],
        'order_priority': element['order_priority'],
    }


# this is the controller function
def run(save_main_session=True):
    beam_options = PipelineOptions()
    args = beam_options.view_as(MyOptions)
    with beam.Pipeline(options=beam_options) as p:
        db_source = db_reader(args.bqprojectid, args.dataset, args.table, args.limit)
        db = db_writer(args.destinationip, args.port, args.destinationusername, args.destinationpassword, args.destinationtable, args.database_name)
              # Result from BigQuery.
        result = (p | beam.io.ReadFromBigQuery(use_standard_sql=True, query=db_source.sql_query()))

        # Sink result to GCS.
        result | 'Write to GCS' >> WriteToText('SOMEURI')

        # Map to result to Beam Nuggets data and Sink result to the database.
        (result
         | 'Map to beam nuggets data' >> beam.Map(map_to_beam_nuggets_data)
         | 'Write to Database' >> relational_db.Write(
                    source_config=(db.sink_config()),
                    table_config=(db.table_config())
                ))


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

1 Answers1

1

I hope it can help.

In this case, you can use multi sink output :

  • Result PCollection from BigQuery
  • Sink the result to GCS
  • Transform the result to the expected output for beam nuggets data and sink it to the database
def run_pipeline(save_main_session=True):
    beam_options = PipelineOptions()
    args = beam_options.view_as(MyOptions)
    with Pipeline(options=beam_options) as p:
        db_source = db_reader(args.bqprojectid, args.dataset, args.table, args.limit)
        db = db_writer(args.destinationip, args.port, args.destinationusername, args.destinationpassword,
                       args.destinationtable, args.database_name)

        # Result from BigQuery.
        result = (p | beam.io.ReadFromBigQuery(use_standard_sql=True, query=db_source.sql_query()))

        # Sink result to GCS.
        result | 'Write to GCS' >> WriteToText('SOMEURI')

        # Map to result to Beam Nuggets data and Sink result to the database.
        (result
         | 'Map to beam nuggets data' >> beam.Map(map_to_beam_nuggets_data)
         | 'Write to Database' >> relational_db.Write(
                    source_config=(db.sink_config()),
                    table_config=(db.table_config())
                )
         )

def map_to_beam_nuggets_data(element: Dict) -> Dict:
    return {
        'name': element['name'],
        'num': element['num'],
    }

To simulate your input data from BigQuery, I created an unit test with a mock and pytest. The ReadFromBigQuery source should returns a PCollection of Dict :

import apache_beam as beam
import pytest
from apache_beam.testing.test_pipeline import TestPipeline

def test_pipeline(self):
    mock = [
        {
            'name': 'toto',
            'num': 'num',
            'other_field': 'other'
        }
    ]

    with TestPipeline() as p:
        result = (
                p
                | beam.Create(mock)
                | 'Map to Beam nuggets data' >> beam.Map(self.map_to_beam_nuggets_data)
        )

        result | "Print outputs" >> beam.Map(print)

def map_to_beam_nuggets_data(self, element: Dict) -> Dict:
    return {
        'name': element['name'],
        'num': element['num'],
    }

The result is :

{'name': 'toto', 'num': 'num'}
Mazlum Tosun
  • 5,761
  • 1
  • 9
  • 23
  • Thank you! I am still getting the same error. At first I got NameError: name 'Dict' is not defined with the function in the snippet. I updated the code to this, but still running into the same beam assertion error. I really appreciate the help. ``` def map_to_beam_nuggets_data(element): return { 'order_id': element['order_id'], 'order_date': element['order_date'], } ``` – infiniteloop314 Dec 29 '22 at 15:26
  • I added a code snippet with an unit test to mock your input data from `BigQuery`. I have the expected result. I hope this will help you. – Mazlum Tosun Dec 29 '22 at 15:47