I have a DAG which can insert data of JSON file in BQ table but I have a JSON file and many fields are started with illegal characters. So I have created BQ table with legal column names but I need to map BQ column names with JSON file fields and store it in a JSON file, to upload it into BQ table but it's not working.
My JSON file :-
{"ID":"4238382","Title":"El clon Cap\u00edtulo 3","Description":"Said y Ali llegan a un acuerdo. Leonidas sale con Yvete y Diogo. Edvaldo no quiere hacerse los ex\u00e1menes. Jade se reh\u00fasa a usar velo. Lucas se disculpa con Ali. Albieri dice que Ali fue duro con Jade, Ali lo acusa de querer experimentar con humanos.","Program":"El Clon","Season":"1","Episode":"3","Source":"GLOBO TV INTERNACIONAL","Category":"Drama","Syndicator":"CSv2","[CSv2] external_id":"ELCL100002002","[CSv2] pub_win_US_begin":"1661842800","[CSv2] pub_win_US_end":"1754625600","[CSv2] language":"es","[CSv2] title":"El clon Cap\u00edtulo 3","[CSv2] descriptive_title":"Acuerdo de matrimonio","[CSv2] description":"Said y Ali llegan a un acuerdo. Leonidas sale con Yvete y Diogo. Edvaldo no quiere hacerse los ex\u00e1menes. Jade se reh\u00fasa a usar velo. Lucas se disculpa con Ali. Albieri dice que Ali fue duro con Jade, Ali lo acusa de querer experimentar con humanos.","[CSv2] supplier":"GLOBO TV INTERNACIONAL","[CSv2] categories":"Drama","[CSv2] rating":"TV-14","[CSv2] subratings":"D","[CSv2] program_type":"NOVELA","[CSv2] entity":"","[CSv2] exception_countries":"US ,\tUM ,PR ,\tMX ,\tAR ,\tCL ,\tCO ,\tPE ,\tEC ,\tCR ,\tSV ,\tHN ,\tBO ,\tPA ,\tDO ,\tNI ,\tPY ,\tVE ,\tUY ,\tGT","[CSv2] episode_type":"","TMS ID":null,"external_id":"ELCL100002002","Content Type":"Entertainment","Release Year":"2001","sports_event_ID":""}
My BQ table :-
Created a python plugin file to map the fields of JSON file with BQ columns and store it in BQ :-
import json
import csv
import logging
import os
import bson.json_util as json_util
from google.cloud import storage
from pydantic import BaseModel, Field, validator
class EventsModel(BaseModel):
ID: int = None
Title: str = None
Description: str = None
Program: str = None
Season: int = None
Episode: int = None
Source: str = None
Category: str = None
Syndicator: str = None
CSv2_external_id: str = Field(alias="[CSv2] external_id", default=None)
CSv2_pub_win_US_begin: int = Field(alias="[CSv2] pub_win_US_begin", default=None)
CSv2_pub_win_US_end: int = Field(alias="[CSv2] pub_win_US_end", default=None)
CSv2_language: str = Field(alias="[CSv2] language", default=None)
CSv2_title: str = Field(alias="[CSv2] title", default=None)
CSv2_descriptive_title: str = Field(alias="[CSv2] descriptive_title", default=None)
CSv2_description: str = Field(alias="[CSv2] description", default=None)
CSv2_supplier: str = Field(alias="[CSv2] supplier", default=None)
CSv2_categories: str = Field(alias="[CSv2] categories", default=None)
CSv2_rating: str = Field(alias="[CSv2] rating", default=None)
CSv2_subratings: str = Field(alias="[CSv2] subratings", default=None)
CSv2_program_type: str = Field(alias="[CSv2] program_type", default=None)
CSv2_entity: str = Field(alias="[CSv2] entity", default=None)
CSv2_exception_countries: str = Field(alias="[CSv2] exception_countries", default=None)
CSv2_episode_type: str = Field(alias="[CSv2] episode_type", default=None)
TMS_ID: str = Field(alias="TMS ID", default= None)
external_id: str = None
Content_Type: str = None
Release_Year: int = None
sports_event_ID: str = None
@validator(
"TMS_ID",
pre=True,
always=True,
)
def is_date(cls, v):
try:
if type(v) == str:
v = None if v.lower() =="null" else v
else:
raise ValueError
except ValueError:
v = "null"
return v
def map_keys(bucket_name, file_path, list_of_files): #pass the folder as an argument
logging.info(f"bucket_name: {bucket_name}")
logging.info(f"file_path: {file_path}")
storage_client = storage.Client()
path = f'''{bucket_name}'''
logging.info(f"list_of_files from the DAG: {list_of_files}")
blobs = storage_client.list_blobs(
bucket_or_name=path
)
file = ""
logging.info(f"blob {blobs}")
for blob in blobs:
if not blob.name.endswith("/"):
file = blob.name
bucket = storage_client.get_bucket(bucket_name)
#TODO: iterate the files into the path and parse using the new model
logging.info(f"file: {file}")
with open(file, "w") as j_file:
j_file.write(json_util.dumps(file))
j_file.write("\n")
# mapper del modelo
new_model = EventsModel.parse_obj(j_file)
new_model = new_model.dict()
with open(new_model, "w") as file_transformed:
file_transformed.write(json.dumps(new_model))
file_transformed.write("\n")
blob = bucket.blob(f"test_file")
blob.upload_from_filename(file_transformed)
Added it in DAG :-
map_json_keys_with_BQ_columns = PythonOperator(
task_id="map_json_keys_with_BQ_columns",
retries=0,
python_callable=map_keys,
op_kwargs={
"bucket_name": mcp_bucket,
"file_path": mcp_source_folder,
"list_of_files": source_files
},
dag=dag,
)
mcp_ingestion_to_bq = GCSToBigQueryOperator(
task_id="mcp_ingestion_to_bq",
retries=0,
dag=dag,
bucket=mcp_bucket,
source_objects=f"{mcp_source_folder}*.json",
source_format="NEWLINE_DELIMITED_JSON",
#skip_leading_rows=16,
destination_project_dataset_table=destination_bq_table,
write_disposition="WRITE_TRUNCATE",
create_disposition="CREATE_NEVER",
autodetect=True
)