-1

I'm facing an issue or rather a challenge which I haven't been able to figure out yet. And thought that I might need help in setting it up the right way, any leads will be highly appreciated.

I'm very familiar with AWS but recently migrated to GCP and been tasked to setup the ETL flow, apologies If I cause any confusions in explaining the flow ahead.

To give a overview or sought of summary, this is how currently our pipeline is setup :

I've a production bucket, say abc_prod, where app data is being populated in parquet, few to name are :

  • abc_prod / installed_apps /
  • abc_prod /contacts /
  • abc_prod / location /
  1. On daily basis parquet files are added in warehouse (GCS) in respective bucket/folders mentioned above

  2. GCS to BQ : Python script reads the parquets from GCS and loads it to BQ in "inc" database with the help of the following code :

     bq_client = bigquery.Client()
     job_config = bigquery.LoadJobConfig()
     job_config.source_format = bigquery.SourceFormat.PARQUET
    
     # Write truncate will overwrite existing data
     job_config.write_disposition = "WRITE_TRUNCATE"
    
     # Executes BQ query job in background without waiting for its result
     job = bq_client.load_table_from_uri(src_uri, 
                                         table_ref, ### Reference to table say location_inc, contacts_inc, etc
                                         job_config=job_config)
    
  3. BQ to BQ : Final step , the same python script reads the above BQ table and appends it in another database called "main" with the help of following code snippet :

    bq_client = bigquery.Client()
    
    job_config = bigquery.QueryJobConfig(destination=table_id, ### Reference to main table say location_main, contact_main, etc
                                         write_disposition="WRITE_APPEND")
    
    query_job = bq_client.query(query, job_config)
    

Now, the above pipeline executes without any failures for contact and installed_apps

The concern here is location , For location until 2nd step everything executes smoothly. The parquets are read and write truncated in BQ "inc" database. On third step, although the code gets executed, but the data from "inc" DB is not appended into "main". The code produces No error or any kind of warning that could lead me debug ahead. Hence, I'm not able to figure out the exact cause behind this. I had used allow_large_results=True as a parameter for QueryJobConfig but still not able to populate "location_main".

One thing I noticed is that If I try it on few chunks of location parquets , it gets populated. It will be a big help, if anyone could provide me some insights. I've been thinking along these lines - is location_main failing to populate due to large volume of data, or is it something to do with inconsistent data types in parquet, etc.

Thanks in advance.

  • 1
    Did you check if the `inc` table was populated? Also, you can try to run only the `query` of second step directly in the console to see the output – Damião Martins Jun 06 '22 at 16:24
  • Yes , as I mentioned that till second step it is fine. location_inc is populated. Let me check running the query for main table directly like you say. I will update here. Thanks. – Syntax Error Jun 06 '22 at 17:05
  • I'm able to run that query. In console I got the dataset. Also, in python by using *bq_client.query(query).to_dataframe()* I got the dataframe – Syntax Error Jun 06 '22 at 17:41
  • Have you checked in the logs from [Cloud Logging in BigQuery](https://towardsdatascience.com/bigquery-3-simple-ways-to-audit-and-monitor-your-dabatase-1cf09027f0de)? – Rogelio Monter Jun 10 '22 at 19:55
  • 1
    Also adding a `try... catch` block could help to troubleshoot your issue, as reffered on this [answer](https://stackoverflow.com/a/57280504/13171940) – Rogelio Monter Jun 10 '22 at 19:59
  • Thanks guys for all the help , I had resolved it. I will add solution here. – Syntax Error Jun 20 '22 at 07:55

1 Answers1

0

I manage to figure out that some parquets had extra / unexpected column. At the time of creating schema for master, it was created on the basis of column that were available at that point of time. I found out that we can handle column addition / relaxation in QueryJobConfig by adding parameter schema_update_options which holds 2 options ['ALLOW_FIELD_ADDITION', 'ALLOW_FIELD_RELAXATION']

E.g :

    job_config = \
    bigquery.QueryJobConfig(destination=table_id,
                            write_disposition="WRITE_APPEND",
                            schema_update_options=['ALLOW_FIELD_ADDITION','ALLOW_FIELD_RELAXATION'])