1

I am trying to write BigQuery table records as JSON file in GCS bucket using Apache Beam in python.

I have a BigQuery table - my_project.my_dataset.my_table like this

enter image description here

I wish to write the table records/entries into a JSON file in a GCS bucket location - "gs://my_core_bucket/data/my_data.json"

Format of JSON expected:


[
    {"id":"1","values":{"name":"abc","address":"Mumbai","phn":"1111111111"}},
    {"id":"2","values":{"name":"def","address":"Kolkata","phn":"2222222222"}},
    {"id":"3","values":{"name":"ghi","address":"Chennai","phn":"3333333333"}},
    {"id":"4","values":{"name":"jkl","address":"Delhi","phn":"4444444444"}}
]

But, with my current implementation of apache pipeline I see that the JSON file created has entries like this in file "gs://my_core_bucket/data/my_data.json"

{"id":"1","values":{"name":"abc","address":"Mumbai","phn":"1111111111"}}
{"id":"2","values":{"name":"def","address":"Kolkata","phn":"2222222222"}}
{"id":"3","values":{"name":"ghi","address":"Chennai","phn":"3333333333"}}
{"id":"4","values":{"name":"jkl","address":"Delhi","phn":"4444444444"}}

How do I create a clean JSON file having BigQuery records as JSON array elements ?

Here is my pipeline code.

import os
import json
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


class PrepareData(beam.DoFn):
    def process(self, record):  # sample record - {"id": "1", "name": "abc", "address": "Mumbai", "phn": "1111111111"}        
        rec_columns = [ "id", "name", "address", "phn", "country", "age"]   # all columns of the bigquery table 

        rec_keys = list(record.keys())  # ["id", "name", "address", "phn"]  # columns needed for processing  

        values = {}

        for x in range(len(rec_keys)):
            key = rec_keys[x]

            if key != "id" and key in rec_columns:
                values[key] = record[key]

        return [{"id": record['id'], "values": values}]


class MainClass:
    def run_pipe(self):
        try:        
            project = "my_project"
            dataset = "my_dataset"
            table = "my_table"
            region = "us-central1"
            job_name = "create-json-file"
            temp_location = "gs://my_core_bucket/dataflow/temp_location/"
            runner = "DataflowRunner"
            
            # set pipeline options
            argv = [
                f'--project={project}',
                f'--region={region}',
                f'--job_name={job_name}',
                f'--temp_location={temp_location}',
                f'--runner={runner}'
            ]
            
            # json file name
            file_name = "gs://my_core_bucket/data/my_data"

            # create pipeline 
            p = beam.Pipeline(argv=argv)

            # query to read table data
            query = f"SELECT id, name, address, phn FROM `{project}.{dataset}.{table}` LIMIT 4"

            bq_data = p | 'Read Table' >> beam.io.Read(beam.io.ReadFromBigQuery(query=query, use_standard_sql=True))

            # bq_data will be in the form 
            # {"id": "1", "name": "abc", "address": "Mumbai", "phn": "1111111111"}
            # {"id": "2", "name": "def", "address": "Kolkata", "phn": "2222222222"}
            # {"id": "3", "name": "ghi", "address": "Chennai", "phn": "3333333333"}
            # {"id": "4", "name": "jkl", "address": "Delhi", "phn": "4444444444"}
            
            # alter data in the form needed for downstream process
            prepared_data = bq_data | beam.ParDo(PrepareData())

            # write formatted pcollection as JSON file
            prepared_data | 'JSON format' >> beam.Map(json.dumps)
            prepared_data | 'Write Output' >> beam.io.WriteToText(file_name, file_name_suffix=".json", shard_name_template='')

            # execute pipeline
            p.run().wait_until_finish()
        except Exception as e:
            logging.error(f"Exception in run_pipe - {str(e)}")


if __name__ == "__main__":
    main_cls = MainClass()
    main_cls.run_pipe()
jccampanero
  • 50,989
  • 3
  • 20
  • 49
Gopinath S
  • 101
  • 1
  • 14
  • The output makes perfect sense because Apache Beam will map every element of the input `PCollection` to JSON as indicated. I am not sure if it will work, but please, try `CombineGlobally` instead: `prepared_data | 'JSON format' >> beam. CombineGlobally(json.dumps)`. See the [relevant docs](https://beam.apache.org/documentation/transforms/python/aggregation/combineglobally/). – jccampanero Jul 12 '21 at 18:04
  • @jccampanero - It is not working with beam.CombineGlobally(json.dumps) . I see error "Object of type _ReiterableChain is not JSON serializable" when I tried with CombineGlobally – Gopinath S Jul 12 '21 at 18:29
  • Please, can you try the provided answer? I hope it helps. Please, be aware that I do not now th possible performance impact of writing the information in this way. – jccampanero Jul 12 '21 at 22:08

2 Answers2

3

As suggested in the comments, please, try combining all the results in one. In order to successfully serialize the set obtained as result of the combination process, you can use a custom serializer.

Your code can look like this:

import os
import json
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


# Based on https://stackoverflow.com/questions/8230315/how-to-json-serialize-sets
class SetEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, set):
            return list(obj)
        return json.JSONEncoder.default(self, obj)


# utility function for list combination
class ListCombineFn(beam.CombineFn):
    def create_accumulator(self):
        return []

    def add_input(self, accumulator, input):
        accumulator.append(input)
        return accumulator

    def merge_accumulators(self, accumulators):
        merged = []
        for accum in accumulators:
            merged += accum
        return merged

    def extract_output(self, accumulator):
        return accumulator



class PrepareData(beam.DoFn):
    def process(self, record):  # sample record - {"id": "1", "name": "abc", "address": "Mumbai", "phn": "1111111111"}        
        rec_columns = [ "id", "name", "address", "phn", "country", "age"]   # all columns of the bigquery table 

        rec_keys = list(record.keys())  # ["id", "name", "address", "phn"]  # columns needed for processing  

        values = {}

        for x in range(len(rec_keys)):
            key = rec_keys[x]

            if key != "id" and key in rec_columns:
                values[key] = record[key]

        return [{"id": record['id'], "values": values}]


class MainClass:
    def run_pipe(self):
        try:        
            project = "my_project"
            dataset = "my_dataset"
            table = "my_table"
            region = "us-central1"
            job_name = "create-json-file"
            temp_location = "gs://my_core_bucket/dataflow/temp_location/"
            runner = "DataflowRunner"
            
            # set pipeline options
            argv = [
                f'--project={project}',
                f'--region={region}',
                f'--job_name={job_name}',
                f'--temp_location={temp_location}',
                f'--runner={runner}'
            ]
            
            # json file name
            file_name = "gs://my_core_bucket/data/my_data"

            # create pipeline 
            p = beam.Pipeline(argv=argv)

            # query to read table data
            query = f"SELECT id, name, address, phn FROM `{project}.{dataset}.{table}` LIMIT 4"

            bq_data = p | 'Read Table' >> beam.io.Read(beam.io.ReadFromBigQuery(query=query, use_standard_sql=True))

            # bq_data will be in the form 
            # {"id": "1", "name": "abc", "address": "Mumbai", "phn": "1111111111"}
            # {"id": "2", "name": "def", "address": "Kolkata", "phn": "2222222222"}
            # {"id": "3", "name": "ghi", "address": "Chennai", "phn": "3333333333"}
            # {"id": "4", "name": "jkl", "address": "Delhi", "phn": "4444444444"}
            
            # alter data in the form needed for downstream process
            prepared_data = bq_data | beam.ParDo(PrepareData())

            # combine all the results in one PCollection
            # see https://beam.apache.org/documentation/transforms/python/aggregation/combineglobally/
            prepared_data | 'Combine results' >> beam.CombineGlobally(ListCombineFn())

            # write formatted pcollection as JSON file. We will use a 
            # custom encoder for se serialization
            prepared_data | 'JSON format' >> beam.Map(json.dumps, cls=SetEncoder)
            prepared_data | 'Write Output' >> beam.io.WriteToText(file_name, file_name_suffix=".json", shard_name_template='')

            # execute pipeline
            p.run().wait_until_finish()
        except Exception as e:
            logging.error(f"Exception in run_pipe - {str(e)}")


if __name__ == "__main__":
    main_cls = MainClass()
    main_cls.run_pipe()
jccampanero
  • 50,989
  • 3
  • 20
  • 49
  • tried the new approach to combine but received this error " TypeError: descriptor 'union' requires a 'set' object but received a 'dict'" – Gopinath S Jul 13 '21 at 08:24
  • Hi @GopinathS. I see. Sorry, I am unable to test your actual code, and I tested it with simple test data. Please, can you try the updated solution? I created a custom combine function in order to merge your results. It probably will work but, in any case, I do not know if it is the best option. Any way, I hope it helps. – jccampanero Jul 13 '21 at 09:51
  • - I had to alter my pipeline like this for the "ListCombineFn" to work `( bq_data | 'Prepare for push' >> beam.ParDo(PrepForPush(CONFIG)) | 'Combine results' >> beam.CombineGlobally(ListCombineFn()) | 'JSON format' >> beam.Map(json.dumps, cls=SetEncoder) | 'Write Output' >> beam.io.WriteToText(file_name, shard_name_template='') ) ` This line still creates the JSON file with just dicts prepared_data | 'Combine results' >> beam.CombineGlobally(ListCombineFn()) – Gopinath S Jul 13 '21 at 15:36
  • Hi @GopinathS. You mean that the pipeline can be simplified? It worked then, or are you still facing the same error? I am sorry GopinathS, but as I told you I am unable to test with your actual data, I am just testing with simple use cases, I apologize if the answer is not as accurate as I had wished – jccampanero Jul 13 '21 at 15:55
  • Hi @jccampanero - It worked, and I am able to create the JSON file as array of elements. The custom combine function seems to be the fix for the issue. – Gopinath S Jul 13 '21 at 18:09
  • That is great @GopinathS. I am very happy to hear that it worked. Please, as I told, only pay attention to the possible performance impact. Please, do not hesitate to contact me again if you think that I can be of any help. – jccampanero Jul 13 '21 at 18:12
0

You can do it directly in BigQuery and simply print the result as-is with Dataflow.

Only change the query

query = f"Select ARRAY_AGG(str) from (SELECT struct(id as id, name as name, address as address, phn as phn) as str FROM `{project}.{dataset}.{table}` LIMIT 4)"

Keep in mind that

  • The BigQuery processing will be always faster and cheaper than a dataflow processing (or other processing on equivalent chip)
  • Dataflow will always build a valid JSON (your JSON is invalid, you can't start by an array)
guillaume blaquiere
  • 66,369
  • 2
  • 47
  • 76
  • I will be using the JSON file in the downstream hence creation of JSON files in the expected format is very necessary in my case. Also the data is huge, for the sake of explaining I have restricted to 4 records. – Gopinath S Jul 13 '21 at 09:16
  • Basically, I want to convert the whole BigQuery table as JSON files and use them later on for processing. – Gopinath S Jul 13 '21 at 09:20