0

How to tidy data frame(json ) with dynamic nested structs / arrays in PySpark ?

I have 10000 json files, each has static and dynamic fields as described below.

  • Static names: data, label, units, date, val, num (can be hardcoded)
  • Dynamic names:data_1_a, data_1000_xyz , name_1a , name_1b, name_10000_xyz, A, B (cannot be hardcoded as they are up to 10000 names / data sub categories)

Input df:

root
 |-- data: struct (nullable = true)
 |    |-- data_1000_xyz: struct (nullable = true)
 |    |    |-- name_10000_xyz: struct (nullable = true)
 |    |    |    |-- label: string (nullable = true)
 |    |    |    |-- units: struct (nullable = true)
 |    |    |    |    |-- A: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |-- date: string (nullable = true)
 |    |    |    |    |    |    |-- num: string (nullable = true)
 |    |    |    |    |    |    |-- val: long (nullable = true)
 |    |    |-- name_1b: struct (nullable = true)
 |    |    |    |-- label: string (nullable = true)
 |    |    |    |-- units: struct (nullable = true)
 |    |    |    |    |-- B: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |-- date: string (nullable = true)
 |    |    |    |    |    |    |-- val: long (nullable = true)
 |    |-- data_1_a: struct (nullable = true)
 |    |    |-- name_1a: struct (nullable = true)
 |    |    |    |-- label: string (nullable = true)
 |    |    |    |-- units: struct (nullable = true)
 |    |    |    |    |-- A: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |-- date: string (nullable = true)
 |    |    |    |    |    |    |-- val: long (nullable = true)
 |-- id: long (nullable = true)


+-----------------------------------------------------------------------------------------------------------------------------+---+
|data                                                                                                                         |id |
+-----------------------------------------------------------------------------------------------------------------------------+---+
|{{{null, {[{2018, str, 4}, {2019, null, 5}, {2020, str, 6}]}}, {null, {[{2019, 2}, {2020, 3}]}}}, {{label_1, {[{2020, 1}]}}}}|1  |
+-----------------------------------------------------------------------------------------------------------------------------+---+

Required df:

+---+--------------+----------------+---------+-------+------+-----+------+
|id |level_1       |level_2         |level_3  |level_4| date | val | num  | 
+---+--------------+----------------+---------+-------+------+-----+------+
|1  |data_1_a      | name_1a        | unit    | A     | 2020 |  1  | null |
|1  |data_1000_xyz | name_1b        | unit    | B     | 2019 |  2  | null |
|1  |data_1000_xyz | name_1b        | unit    | B     | 2020 |  3  | null |
|1  |data_1000_xyz | name_10000_xyz | unit    | A     | 2018 |  4  | str  |
|1  |data_1000_xyz | name_10000_xyz | unit    | A     | 2019 |  5  | null |
|1  |data_1000_xyz | name_10000_xyz | unit    | A     | 2020 |  6  | str  |
+-------------------------------------------------------------------------+

To reproduce input df:

json_1 = """{"id":1,"data":{"data_1_a":{"name_1a":{"label":"label_1","units":{"A":[{"date":"2020","val":1}]}}},"data_1000_xyz":{"name_1b":{"label":null,"units":{"B":[{"date":"2019","val":2},{"date":"2020","val":3}]}},"name_10000_xyz":{"label":null,"units":{"A":[{"date":"2018","val":4,"num":"str"},{"date":"2019","val":5},{"date":"2020","val":6,"num":"str"}]}}}}}"""
df = spark.read.json(sc.parallelize([json_1]))

Python Pandas solution:

    import pandas as pd
    
    # 1) flatten json
    df = pd.json_normalize(json_1)
    df_dic = df.to_dict('records')
    
    # 2) split to levels
    data = []
    for row in df_dic:
        k={}
        for item in row.items():
            if item[0] == 'id':
                id = item[1]
            else:  
                keys  = item[0].split('.')
                k = {i:s for i,s in enumerate(keys)}
                k.update({'value':item[1]})
                k.update({'id':id})
                data.append(k)
    
    df = (pd.DataFrame(data)[['id',1,2,3,4,'value']]
    .rename(columns={1:'level_1',2:'level_2',3:'level_3',4:'level_4' }))
    df = df.loc[~df['level_4'].isnull()]

    # 3) explode
    dfe = df.explode('value', ignore_index=True)
    
    # 4) pop the value column and create a new dataframe from it then join the new frame with the exploded frame.
    output_df = dfe.join(pd.DataFrame([*dfe.pop('value')], index=dfe.index))
    
    
        id  level_1         level_2         level_3 level_4 date    val num
    0   1   data_1_a        name_1a         units   A      2020     1   NaN
    1   1   data_1000_xyz   name_1b         units   B      2019     2   NaN
    2   1   data_1000_xyz   name_1b         units   B      2020     3   NaN
    3   1   data_1000_xyz   name_10000_xyz  units   A      2018     4   str
    4   1   data_1000_xyz   name_10000_xyz  units   A      2019     5   NaN
    5   1   data_1000_xyz   name_10000_xyz  units   A      2020     6   str

Useful links:

Dan
  • 437
  • 7
  • 24

0 Answers0