How to tidy data frame(json ) with dynamic nested structs / arrays in PySpark ?
I have 10000 json files, each has static and dynamic fields as described below.
- Static names:
data
,label
,units
,date
,val
,num
(can be hardcoded) - Dynamic names:
data_1_a
,data_1000_xyz
,name_1a
,name_1b
,name_10000_xyz
,A
,B
(cannot be hardcoded as they are up to 10000 names / data sub categories)
Input df:
root
|-- data: struct (nullable = true)
| |-- data_1000_xyz: struct (nullable = true)
| | |-- name_10000_xyz: struct (nullable = true)
| | | |-- label: string (nullable = true)
| | | |-- units: struct (nullable = true)
| | | | |-- A: array (nullable = true)
| | | | | |-- element: struct (containsNull = true)
| | | | | | |-- date: string (nullable = true)
| | | | | | |-- num: string (nullable = true)
| | | | | | |-- val: long (nullable = true)
| | |-- name_1b: struct (nullable = true)
| | | |-- label: string (nullable = true)
| | | |-- units: struct (nullable = true)
| | | | |-- B: array (nullable = true)
| | | | | |-- element: struct (containsNull = true)
| | | | | | |-- date: string (nullable = true)
| | | | | | |-- val: long (nullable = true)
| |-- data_1_a: struct (nullable = true)
| | |-- name_1a: struct (nullable = true)
| | | |-- label: string (nullable = true)
| | | |-- units: struct (nullable = true)
| | | | |-- A: array (nullable = true)
| | | | | |-- element: struct (containsNull = true)
| | | | | | |-- date: string (nullable = true)
| | | | | | |-- val: long (nullable = true)
|-- id: long (nullable = true)
+-----------------------------------------------------------------------------------------------------------------------------+---+
|data |id |
+-----------------------------------------------------------------------------------------------------------------------------+---+
|{{{null, {[{2018, str, 4}, {2019, null, 5}, {2020, str, 6}]}}, {null, {[{2019, 2}, {2020, 3}]}}}, {{label_1, {[{2020, 1}]}}}}|1 |
+-----------------------------------------------------------------------------------------------------------------------------+---+
Required df:
+---+--------------+----------------+---------+-------+------+-----+------+
|id |level_1 |level_2 |level_3 |level_4| date | val | num |
+---+--------------+----------------+---------+-------+------+-----+------+
|1 |data_1_a | name_1a | unit | A | 2020 | 1 | null |
|1 |data_1000_xyz | name_1b | unit | B | 2019 | 2 | null |
|1 |data_1000_xyz | name_1b | unit | B | 2020 | 3 | null |
|1 |data_1000_xyz | name_10000_xyz | unit | A | 2018 | 4 | str |
|1 |data_1000_xyz | name_10000_xyz | unit | A | 2019 | 5 | null |
|1 |data_1000_xyz | name_10000_xyz | unit | A | 2020 | 6 | str |
+-------------------------------------------------------------------------+
To reproduce input df:
json_1 = """{"id":1,"data":{"data_1_a":{"name_1a":{"label":"label_1","units":{"A":[{"date":"2020","val":1}]}}},"data_1000_xyz":{"name_1b":{"label":null,"units":{"B":[{"date":"2019","val":2},{"date":"2020","val":3}]}},"name_10000_xyz":{"label":null,"units":{"A":[{"date":"2018","val":4,"num":"str"},{"date":"2019","val":5},{"date":"2020","val":6,"num":"str"}]}}}}}"""
df = spark.read.json(sc.parallelize([json_1]))
Python Pandas solution:
import pandas as pd
# 1) flatten json
df = pd.json_normalize(json_1)
df_dic = df.to_dict('records')
# 2) split to levels
data = []
for row in df_dic:
k={}
for item in row.items():
if item[0] == 'id':
id = item[1]
else:
keys = item[0].split('.')
k = {i:s for i,s in enumerate(keys)}
k.update({'value':item[1]})
k.update({'id':id})
data.append(k)
df = (pd.DataFrame(data)[['id',1,2,3,4,'value']]
.rename(columns={1:'level_1',2:'level_2',3:'level_3',4:'level_4' }))
df = df.loc[~df['level_4'].isnull()]
# 3) explode
dfe = df.explode('value', ignore_index=True)
# 4) pop the value column and create a new dataframe from it then join the new frame with the exploded frame.
output_df = dfe.join(pd.DataFrame([*dfe.pop('value')], index=dfe.index))
id level_1 level_2 level_3 level_4 date val num
0 1 data_1_a name_1a units A 2020 1 NaN
1 1 data_1000_xyz name_1b units B 2019 2 NaN
2 1 data_1000_xyz name_1b units B 2020 3 NaN
3 1 data_1000_xyz name_10000_xyz units A 2018 4 str
4 1 data_1000_xyz name_10000_xyz units A 2019 5 NaN
5 1 data_1000_xyz name_10000_xyz units A 2020 6 str
Useful links:
- How to flatten nested arrays by merging values in spark
- Is there a function in pyspark dataframe that is similar to pandas.io.json.json_normalize
- Automatically and Elegantly flatten DataFrame in Spark SQL
- Flatten nested array in Spark DataFrame
- https://learn.microsoft.com/en-us/azure/synapse-analytics/how-to-analyze-complex-schema
- Flatten a dynamic nested struct (struct inside struct) in PySpark
- https://towardsdatascience.com/nested-data-types-in-spark-3-1-663e5ed2f2aa
- https://towardsdatascience.com/flattening-json-records-using-pyspark-b83137669def
- https://docs.databricks.com/_static/notebooks/complex-nested-structured.html
- https://docs.databricks.com/delta/data-transformation/higher-order-lambda-functions.html
- https://docs.databricks.com/_static/notebooks/complex-nested-structured.html
- https://docs.databricks.com/_static/notebooks/transform-complex-data-types-python.html
- https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html