I am trying to convert 2 levels of nested json into pyspark dataframe. Below is my JSON schema looks like:
I am always getting nulls while converting to spark dataframe for products struct which is the last level of nested JSON.
I am trying to convert 2 levels of nested json into pyspark dataframe. Below is my JSON schema looks like:
I am always getting nulls while converting to spark dataframe for products struct which is the last level of nested JSON.
If the structure is fixed as shown in description then try this:
df.select(df.col("b_Code"), df.col("b_Key"),df.col("r_data.s_key"), df.col("r_data.s_Code"), df.col("r_data.products.s_key"), df.col("r_data.products.s_Code"), df.col("r_data.products.s_Type"), df.col("r_data.products.r_type"), df.col("r_data.products.sl"), df.col("r_data.products.sp"))
Here is a function that will flatten nested df irrespective of level of nesting in json
from pyspark.sql.functions import col
def flatten_df(nested_df):
stack = [((), nested_df)]
columns = []
while len(stack) > 0:
parents, df = stack.pop()
flat_cols = [
col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
for c in df.dtypes
if c[1][:6] != "struct"
]
nested_cols = [
c[0]
for c in df.dtypes
if c[1][:6] == "struct"
]
columns.extend(flat_cols)
for nested_col in nested_cols:
projected_df = df.select(nested_col + ".*")
stack.append((parents + (nested_col,), projected_df))
return nested_df.select(columns)
Have you tried to force the schema ?
you can try this, because, apparently, you have a different schema in each files, so enforcing the proper schema should solve your problem :
from pyspark.sql import types as T
schema = T.StructType(
[
T.StructField("b_key", T.IntegerType()),
T.StructField("b_code", T.StringType()),
T.StructField(
"r_date",
T.StructType(
[
T.StructField("s_key", T.IntegerType()),
T.StructField("s_code", T.StringType()),
T.StructField(
"products",
T.StructType(
[
T.StructField("s_key", T.IntegerType()),
T.StructField("s_key", T.IntegerType()),
T.StructField("s_code", T.StringType()),
T.StructField("s_type", T.StringType()),
T.StructField("r_type", T.StringType()),
T.StructField("sl", T.DecimalType()),
T.StructField("sp", T.IntegerType()),
]
),
),
]
),
),
]
)
df = spark.read.json("path/to/file.json", schema=schema)
From, there, you do not have any array, so you can simply select
the nested columns to flatten. For example :
df.selct(
"r_data.*"
)
This will flatten the r_data struct column, and you will end up with 3 columns.