1

I want to read json file. Right now, I am doing the following logic, which is not that dynamic.

df = spark.read.option("multiline", True).json(loc)
df = df.select("data.*", "event.*", "resource_id", "resource_kind", "resource_uri")

I will have to write column.* multiple times as the file is heavily nested, it has multiple StructType

The schema of the same is as below:

root
 |-- data: struct (nullable = true)
 |    |-- accounts: struct (nullable = true)
 |    |    |-- accounting_reference_date: struct (nullable = true)
 |    |    |    |-- day: string (nullable = true)
 |    |    |    |-- month: string (nullable = true)
 |    |    |-- last_accounts: struct (nullable = true)
 |    |    |    |-- made_up_to: string (nullable = true)
 |    |    |    |-- period_end_on: string (nullable = true)
 |    |    |    |-- period_start_on: string (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |    |    |-- next_accounts: struct (nullable = true)
 |    |    |    |-- due_on: string (nullable = true)
 |    |    |    |-- overdue: boolean (nullable = true)
 |    |    |    |-- period_end_on: string (nullable = true)
 |    |    |    |-- period_start_on: string (nullable = true)
 |    |    |-- next_due: string (nullable = true)
 |    |    |-- next_made_up_to: string (nullable = true)
 |    |    |-- overdue: boolean (nullable = true)
 |    |-- can_file: boolean (nullable = true)
 |    |-- company_name: string (nullable = true)
 |    |-- company_number: string (nullable = true)
 |    |-- company_status: string (nullable = true)
 |    |-- confirmation_statement: struct (nullable = true)
 |    |    |-- last_made_up_to: string (nullable = true)
 |    |    |-- next_due: string (nullable = true)
 |    |    |-- next_made_up_to: string (nullable = true)
 |    |    |-- overdue: boolean (nullable = true)
 |    |-- date_of_creation: string (nullable = true)
 |    |-- etag: string (nullable = true)
 |    |-- has_charges: boolean (nullable = true)
 |    |-- is_community_interest_company: boolean (nullable = true)
 |    |-- jurisdiction: string (nullable = true)
 |    |-- last_full_members_list_date: string (nullable = true)
 |    |-- links: struct (nullable = true)
 |    |    |-- charges: string (nullable = true)
 |    |    |-- filing_history: string (nullable = true)
 |    |    |-- officers: string (nullable = true)
 |    |    |-- persons_with_significant_control: string (nullable = true)
 |    |    |-- persons_with_significant_control_statements: string (nullable = true)
 |    |    |-- registers: string (nullable = true)
 |    |    |-- self: string (nullable = true)
 |    |-- previous_company_names: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- ceased_on: string (nullable = true)
 |    |    |    |-- effective_from: string (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |-- registered_office_address: struct (nullable = true)
 |    |    |-- address_line_1: string (nullable = true)
 |    |    |-- address_line_2: string (nullable = true)
 |    |    |-- country: string (nullable = true)
 |    |    |-- locality: string (nullable = true)
 |    |    |-- po_box: string (nullable = true)
 |    |    |-- postal_code: string (nullable = true)
 |    |    |-- region: string (nullable = true)
 |    |-- registered_office_is_in_dispute: boolean (nullable = true)
 |    |-- sic_codes: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- subtype: string (nullable = true)
 |    |-- type: string (nullable = true)
 |-- event: struct (nullable = true)
 |    |-- published_at: string (nullable = true)
 |    |-- timepoint: long (nullable = true)
 |    |-- type: string (nullable = true)
 |-- resource_id: string (nullable = true)
 |-- resource_kind: string (nullable = true)
 |-- resource_uri: string (nullable = true)

As few of the fields are having same names, I need to capture the field name from root.

For eg. field period_start_on is present in both last_accounts and next_accounts. So, I need to make the column name as below:

data.accounts.last_accounts.period_start_on

data.accounts.next_accounts.period_start_on

I think the approach I am taking wont take me longer. Could you please suggest the effective way of reading the json. Also how can we identify 2 fields having same name.

Thank you

SDS
  • 169
  • 1
  • 2
  • 9

1 Answers1

2

First, you can read about this topic to help you rename the fields:

Rename nested field in spark dataframe

After this, you can follow an approach to explode the columns dynamically using some functions to get struct_fields and array_fields

def find_array_fields(dataframe):
    array_fields = []
    for field in dataframe.schema.fields:
        if isinstance(field.dataType, ArrayType):
            array_fields.append(field.name)
    return array_fields

def find_struct_fields(dataframe):
    struct_fields = []
    for field in dataframe.schema.fields:
        if isinstance(field.dataType, StructType):
            struct_fields.append(field.name)
    return struct_fields

def explode_array_fields(dataframe, array_fields: list):
    for field in array_fields:
        dataframe = dataframe.withColumn(field, explode_outer(field))
    return dataframe

def explode_struct_fields(dataframe, struct_fields: list):
    for field in struct_fields:
        columns_fields = dataframe.select(f"{field}.*").columns
        for c in columns_fields:
            #print(f"{field}_{c}")
            dataframe = dataframe.withColumn(f"{field}_{c}", col(f"{field}.{c}"))
        dataframe = dataframe.drop(field)
        #dataframe.printSchema()

    return dataframe

def explode_all(dataframe):
    while len(find_array_fields(dataframe)) > 0 or len(find_struct_fields(dataframe)) > 0:
        # Try to explode array fields
        array_fields = find_array_fields(dataframe)
        if len(array_fields) > 0:
            dataframe = explode_array_fields(dataframe, array_fields)
        # Try to explode struct fields
        struct_fields = find_struct_fields(dataframe)
        if len(struct_fields) > 0:
            dataframe = explode_struct_fields(dataframe, struct_fields)
    return dataframe

After declare these functions you can call it like this:

from pyspark.sql.types import *
from pyspark.sql.functions import *

path = "/your_folder/"
df1 = spark.read.format("your_format").load(path)
df2 = explode_all(df1)
df2.display()
  • Thank you very much @Afonso, the solution code provided is working just perfect. – SDS Jun 16 '23 at 16:17
  • one question, can we change the order of `explode`. Can we do `explode` of `StructType` first then for `ArrayType`? Will it impact anything? – SDS Jun 16 '23 at 16:51
  • 1
    I think there is no impact but would be a good test because there are many possibilities varying according with the dataset – Afonso de Paula Feliciano Jun 16 '23 at 17:16