1

Have ~50k compressed (gzip) json files daily that need to be uploaded to BQ with some transformation, no API calls. The size of the files may be up to 1Gb.

What is the most cost-efficient way to do it?

Will appreciate any help.

LifeIsGoodMF
  • 126
  • 1
  • 9
  • without any other scope and context details - as the first option to think about - BigQuery Data Transfer Service - https://cloud.google.com/bigquery-transfer/docs/introduction and https://cloud.google.com/bigquery-transfer/docs/cloud-storage-transfer – al-dann Apr 19 '22 at 09:29
  • Thank you @al-dann, Transfer Service is good but it does not support transformation – LifeIsGoodMF Apr 19 '22 at 11:50
  • What's the compression format? what's the transformation that you want to achieve (do you need to perform API calls?) – guillaume blaquiere Apr 19 '22 at 12:00
  • gzip, no API calls – LifeIsGoodMF Apr 19 '22 at 18:40
  • a few questions/assumptions about the context - 1/ json structures are identical; 2/ all data are to be appended to one BQ table; 3/ if file is not loaded - it is copied into an 'error' GCS bucket, and handled manually; 4/ it is OK to use firestore for the process state management – al-dann Apr 22 '22 at 07:44
  • 1. most of the time, in case a record has a different structure it should be ignored 2. yes 3. could be, are there other options? but the original files should stay where they are 4. yes – LifeIsGoodMF Apr 22 '22 at 09:43

2 Answers2

1

Most efficient way to use Cloud Data Fusion. I would suggest below approach

  1. Create cloud function and trigger on every new file upload to uncompress file.
  2. Create datafusion job with GCS file as source and bigquery as sink with desired transformation.

Refer below my youtube video. https://youtu.be/89of33RcaRw

Vishal Bulbule
  • 313
  • 1
  • 7
0

Here is (for example) one way - https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-json...

... but quickly looking over it however one can see that there are some specific limitations. So perhaps simplicity, customization and maintainability of solution can also be added to your “cost” function.

Not knowing some details (for example read "Limitations" section under my link above, what stack you have/willing/able to use, files names or if your files have nested fields etc etc etc ) my first thought is cloud function service ( https://cloud.google.com/functions/pricing ) that is "listening" (event type = Finalize/Create) to your cloud (storage) bucket where your files land (if you go this route put your storage and function in the same zone [if possible], which will make it cheaper).

enter image description here


If you can code Python here is some started code:

enter image description here

main.py


import pandas as pd
from pandas.io import gbq
from io import BytesIO, StringIO
import numpy as np
from google.cloud import storage, bigquery
import io


def process(event, context):
    file = event

    # check if its your file can also check for patterns in name 
    if file['name'] == 'YOUR_FILENAME':

        try:
            working_file = file['name']

            storage_client = storage.Client()
            bucket = storage_client.get_bucket('your_bucket_here')
            blob = bucket.blob(working_file)

            #https://stackoverflow.com/questions/49541026/how-do-i-unzip-a-zip-file-in-google-cloud-storage
            zipbytes = io.BytesIO(blob.download_as_string())
            
            #print for logging
            print(f"file downloaded, {working_file}") 
            
            #read_file_as_df --- check out docs here = https://pandas.pydata.org/docs/reference/api/pandas.read_json.html 
            # if nested might need to go text --> to dictionary and then do some preprocessing
            df = pd.read_json(zipbytes, compression='gzip', low_memory= False)

            #write processed to big query
            df.to_gbq(destination_table ='your_dataset.your_table',
                      project_id ='your_project_id',
                      if_exists = 'append')

            print(f"table bq created, {working_file}")

            # if you want to delete processed file from your storage to save on storage costs uncomment 2 lines below
            # blob.delete()
            #print(f"blob delete, {working_file}")        


        except Exception as e:
            print(f"exception occured {e}, {working_file}")

requirements.txt

# Function dependencies, for example:
# package>=version
google-cloud-storage
google-cloud-bigquery
pandas 
pandas.io
pandas-gbq


PS Some alternatives include

  1. Starting up a VM and run your script on a schedule and shutting VM down once process is done ( for example cloud scheduler –-> pub/sub –-> cloud function –-> which starts up your vm --> which then runs your script)
  2. Using app engine to run your script (similar)
  3. Using cloud run to run your script (similar)
  4. Using composer/airflow (not similar to 1,2&3) [ could use all types of approaches including data transfers etc, just not sure what stack you are trying to use or what you already have running ]
  5. Scheduling vertex ai workbook (not similar to 1,2&3, basically write up a jupyter notebook and schedule it to run in vertex ai)
  6. Try to query files directly (https://cloud.google.com/bigquery/external-data-cloud-storage#bq_1) and schedule that query (https://cloud.google.com/bigquery/docs/scheduling-queries) to run (but again not sure about your overall pipeline)

Setup for all (except #5 & #6) just in technical debt to me is not worth it if you can get away with functions



Best of luck,

Yev Guyduy
  • 1,371
  • 12
  • 13