0

I have a DAG which can insert data of JSON file in BQ table but I have a JSON file and many fields are started with illegal characters. So I have created BQ table with legal column names but I need to map BQ column names with JSON file fields and store it in a JSON file, to upload it into BQ table but it's not working.

My JSON file :-

{"ID":"4238382","Title":"El clon Cap\u00edtulo 3","Description":"Said y Ali llegan a un acuerdo. Leonidas sale con Yvete y Diogo. Edvaldo no quiere hacerse los ex\u00e1menes. Jade se reh\u00fasa a usar velo. Lucas se disculpa con Ali. Albieri dice que Ali fue duro con Jade, Ali lo acusa de querer experimentar con humanos.","Program":"El Clon","Season":"1","Episode":"3","Source":"GLOBO TV INTERNACIONAL","Category":"Drama","Syndicator":"CSv2","[CSv2] external_id":"ELCL100002002","[CSv2] pub_win_US_begin":"1661842800","[CSv2] pub_win_US_end":"1754625600","[CSv2] language":"es","[CSv2] title":"El clon Cap\u00edtulo 3","[CSv2] descriptive_title":"Acuerdo de matrimonio","[CSv2] description":"Said y Ali llegan a un acuerdo. Leonidas sale con Yvete y Diogo. Edvaldo no quiere hacerse los ex\u00e1menes. Jade se reh\u00fasa a usar velo. Lucas se disculpa con Ali. Albieri dice que Ali fue duro con Jade, Ali lo acusa de querer experimentar con humanos.","[CSv2] supplier":"GLOBO TV INTERNACIONAL","[CSv2] categories":"Drama","[CSv2] rating":"TV-14","[CSv2] subratings":"D","[CSv2] program_type":"NOVELA","[CSv2] entity":"","[CSv2] exception_countries":"US ,\tUM ,PR ,\tMX ,\tAR ,\tCL ,\tCO ,\tPE ,\tEC ,\tCR ,\tSV ,\tHN ,\tBO ,\tPA ,\tDO ,\tNI ,\tPY ,\tVE ,\tUY ,\tGT","[CSv2] episode_type":"","TMS ID":null,"external_id":"ELCL100002002","Content Type":"Entertainment","Release Year":"2001","sports_event_ID":""}

My BQ table :-

enter image description here

Created a python plugin file to map the fields of JSON file with BQ columns and store it in BQ :-

import json
import csv
import logging
import os
import bson.json_util as json_util

from google.cloud import storage
from pydantic import BaseModel, Field, validator


class EventsModel(BaseModel):
    ID: int = None
    Title: str = None
    Description: str = None
    Program: str = None
    Season: int = None
    Episode: int = None
    Source: str = None
    Category: str = None
    Syndicator: str = None
    CSv2_external_id: str = Field(alias="[CSv2] external_id", default=None)
    CSv2_pub_win_US_begin: int = Field(alias="[CSv2] pub_win_US_begin", default=None)
    CSv2_pub_win_US_end: int    = Field(alias="[CSv2] pub_win_US_end", default=None)
    CSv2_language: str = Field(alias="[CSv2] language", default=None)
    CSv2_title: str = Field(alias="[CSv2] title", default=None)
    CSv2_descriptive_title: str = Field(alias="[CSv2] descriptive_title", default=None)
    CSv2_description: str = Field(alias="[CSv2] description", default=None)
    CSv2_supplier: str = Field(alias="[CSv2] supplier", default=None)
    CSv2_categories: str = Field(alias="[CSv2] categories", default=None)
    CSv2_rating: str = Field(alias="[CSv2] rating", default=None)
    CSv2_subratings: str = Field(alias="[CSv2] subratings", default=None)
    CSv2_program_type: str = Field(alias="[CSv2] program_type", default=None)
    CSv2_entity: str = Field(alias="[CSv2] entity", default=None)
    CSv2_exception_countries: str = Field(alias="[CSv2] exception_countries", default=None)
    CSv2_episode_type: str = Field(alias="[CSv2] episode_type", default=None)
    TMS_ID: str = Field(alias="TMS ID", default= None)
    external_id: str = None
    Content_Type: str = None
    Release_Year: int = None
    sports_event_ID: str = None

    @validator(
        "TMS_ID",
        pre=True,
        always=True,
    )
    def is_date(cls, v):
        try:
            if type(v) == str:
                v = None if v.lower() =="null" else v
            else:
                raise ValueError
        except ValueError:
            v = "null"
        return v


def map_keys(bucket_name, file_path, list_of_files): #pass the folder as an argument
            
    logging.info(f"bucket_name: {bucket_name}")
    logging.info(f"file_path: {file_path}")

    storage_client = storage.Client()

    
    path = f'''{bucket_name}'''
    

    logging.info(f"list_of_files from the DAG: {list_of_files}")
    blobs  = storage_client.list_blobs(
    bucket_or_name=path
    )

    file = ""
    logging.info(f"blob {blobs}")

    for blob in blobs:
        if not blob.name.endswith("/"):
            file = blob.name
            bucket = storage_client.get_bucket(bucket_name)
        
        #TODO: iterate the files into the path and parse using the new model
        
        logging.info(f"file: {file}")

        
        with open(file, "w") as j_file:
                    j_file.write(json_util.dumps(file))
                    j_file.write("\n")
        # mapper del modelo
        new_model = EventsModel.parse_obj(j_file)
        new_model = new_model.dict()

        with open(new_model, "w") as file_transformed:
            file_transformed.write(json.dumps(new_model))
            file_transformed.write("\n")
        
        blob = bucket.blob(f"test_file")
        blob.upload_from_filename(file_transformed)

Added it in DAG :-

map_json_keys_with_BQ_columns = PythonOperator(
        task_id="map_json_keys_with_BQ_columns",
        retries=0,
        python_callable=map_keys,
        op_kwargs={
            "bucket_name": mcp_bucket,
            "file_path": mcp_source_folder,
            "list_of_files": source_files
        },
        dag=dag,
    )


    mcp_ingestion_to_bq = GCSToBigQueryOperator(
        task_id="mcp_ingestion_to_bq",
        retries=0,
        dag=dag,
        bucket=mcp_bucket,
        source_objects=f"{mcp_source_folder}*.json",
        source_format="NEWLINE_DELIMITED_JSON",
        #skip_leading_rows=16,
        destination_project_dataset_table=destination_bq_table,
        write_disposition="WRITE_TRUNCATE",
        create_disposition="CREATE_NEVER",
        
        autodetect=True
    )

But it's not working. Getting an error :- enter image description here Error :- enter image description here

Priya
  • 37
  • 7
  • have you tried changing the invalid json fields programmatically? – Sakshi Gatyan Sep 28 '22 at 08:08
  • @SakshiGatyan : Trying to map the fields by python. Hence created Python Plugin file but getting error. [2022-09-27 04:57:04,672] {taskinstance.py:1465} ERROR - Task failed with exception. File "/home/airflow/gcs/plugins/mcp_plugins.py", line 107, in map_keys EventsModel.parse_obj(json.loads(j_file)) File "/opt/python3.8/lib/python3.8/json/__init__.py", line 341, in loads raise TypeError(f'the JSON object must be str, bytes or bytearray, ' TypeError: the JSON object must be str, bytes or bytearray, not TextIOWrapper [2022-09-27 04:57:04,675] INFO - Marking task as FAILED – Priya Sep 28 '22 at 16:32
  • I believe you're trying to load json data into BQ. Please correct me if i'm wrong. Why don't you operate on your json file directly [example](https://stackoverflow.com/questions/13391579/how-to-rename-json-key) ie. update it with valid names and then load it to BQ directly without creating any table beforehand.? – Sakshi Gatyan Sep 29 '22 at 06:13
  • @Ash_max : In my case it's not possible because first we create table in BQ by using Palomi(it's mandatory) and then only I can load the data in BQ table – Priya Oct 01 '22 at 04:55

1 Answers1

0

I hope it can help.

Your EventsModel is a Pydantic model and applies a validator on fields.

I think your problem occurs in this code part :

for blob in blobs:
      if not blob.name.endswith("/"):
          file = blob.name
          bucket = storage_client.get_bucket(bucket_name)
        
      #TODO: iterate the files into the path and parse using the new model
        
      logging.info(f"file: {file}")

        
      with open(file, "w") as j_file:
                  j_file.write(json_util.dumps(file))
                  j_file.write("\n")
         # mapper del modelo
         new_model = EventsModel.parse_obj(json.loads(j_file))
         new_model = new_model.dict()

         with open(new_model, "w") as file_transformed:
            file_transformed.write(json.dumps(new_model))
            file_transformed.write("\n")
        
         blob = bucket.blob(f"test_file")
         blob.upload_from_filename(file_transformed)

Pydantic indicates you treating a closed file : IO operation on closed file If you apply the following transformation EventsModel.parse_obj(j_file) inside the with open block (add an indentation), I think the file is not closed in this case.

Can you test in this direction please ?

Mazlum Tosun
  • 5,761
  • 1
  • 9
  • 23
  • Getting an error :- File "pydantic/main.py", line 517, in pydantic.main.BaseModel.parse_obj io.UnsupportedOperation: not readable File "pydantic/main.py", line 520, in pydantic.main.BaseModel.parse_obj pydantic.error_wrappers.ValidationError: 1 validation error for EventsModel __root__ EventsModel expected dict not TextIOWrapper (type=type_error) – Priya Sep 26 '22 at 02:22
  • I edited my answer with the following change : `EventsModel.parse_obj(json.loads(j_file))` The `parse_obj` method in `Pydantic` should take a `Dict` not a file – Mazlum Tosun Sep 26 '22 at 15:18
  • I am still getting error related to Wrapper. [2022-09-27 04:57:04,672] {taskinstance.py:1465} ERROR - Task failed with exception. File "/home/airflow/gcs/plugins/mcp_plugins.py", line 107, in map_keys EventsModel.parse_obj(json.loads(j_file)) File "/opt/python3.8/lib/python3.8/json/__init__.py", line 341, in loads raise TypeError(f'the JSON object must be str, bytes or bytearray, ' TypeError: the JSON object must be str, bytes or bytearray, not TextIOWrapper [2022-09-27 04:57:04,675] INFO - Marking task as FAILED. Is there any other solution to insert data in BQ? – Priya Sep 27 '22 at 05:00