1

I have a data with below Schema: index attribute is Struct --> with array --> each array element inside struct

root
 |-- id_num: string (nullable = true)
 |-- indexes: struct (nullable = true)
 |    |-- customer_rating: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- data_sufficiency_indicator: boolean (nullable = true)
 |    |    |    |-- value: double (nullable = true)
 |    |    |    |-- version: string (nullable = true)
 |    |-- reputation: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- data_sufficiency_indicator: boolean (nullable = true)
 |    |    |    |-- low_value_reason: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- value: double (nullable = true)
 |    |    |    |-- version: string (nullable = true)
 |    |-- visibility: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- data_sufficiency_indicator: boolean (nullable = true)
 |    |    |    |-- low_value_reason: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- value: double (nullable = true)
 |    |    |    |-- version: string (nullable = true)

I want to translate the schema into below format and get the data values into corresponding columns

root
 |-- id_num: string (nullable = true)
 |-- indexes_type: string (nullable = true)    --> this field hold indexes struct elements as a row
 |-- data_sufficiency_indicator: boolean (nullable = true)
 |-- value: double (nullable = true)
 |-- version: string (nullable = true)
 |-- low_value_reason: string (nullable = true)  --> each element in the array becomes a new row

Here is the sample input data in json format:

{"id_num":"1234","indexes":{"visibility":[{"version":"2.0","data_sufficiency_indicator":true,"value":2.16,"low_value_reason":["low scores from reviews_and_visits","low scores from online_presence"]}],"customer_rating":[{"version":"2.0","data_sufficiency_indicator":false}],"reputation":[{"version":"2.0","data_sufficiency_indicator":false}]}}
{"data_id":"5678","indexes":{"visibility":[{"version":"2.0","data_sufficiency_indicator":true,"value":2.71,"low_value_reason":["low scores from reviews_and_visits","low scores from online_presence"]}],"customer_rating":[{"version":"2.0","data_sufficiency_indicator":false}]}}
{"data_id":"9876","indexes":{"visibility":[{"version":"2.0","data_sufficiency_indicator":true,"value":3.06}],"customer_rating":[{"version":"2.0","data_sufficiency_indicator":false}],"reputation":[{"version":"2.0","data_sufficiency_indicator":false}]}}

Expected Output

id_num  |   indexes_type    |   version |   data_sufficiency_indicator | value  |   low_value_reason
==============================================================================================================
9999        visibility          2.0             true                    2.16        low scores from reviews_and_visits
9999        visibility          2.0             true                    2.16        low scores from online_presence
9999        customer_rating     2.0             false
9999        reputation          2.0             false
8888        visibility          2.0             true                    2.71        low scores from reviews_and_visits  
8888        visibility          2.0             true                    2.71        low scores from online_presence
8888        customer_rating     2.0             false
7898        visibility          2.0             true                    3.06
7898        customer_rating     2.0             false       
7898        reputation          2.0             false

Any help on this usecase is much appreciated. Also is it possible to get the output without hardcoding the struct values in the code, since they can extend beyond what is in the example.

aj2713
  • 21
  • 3
  • are you able to control the data loading, i.e. specify schema when using spark.read.json(..)? – jxc Dec 18 '20 at 03:42
  • @jxc Not sure if i fully understood your question. i guess i can do that. Right now i am loading the full json file and converting to parquet format, which then gives me the above Schema. Can you help me on this question – aj2713 Dec 18 '20 at 11:24

1 Answers1

1

You can set the column indexes as MapType instead of StructType by explicitly specifying the schema when loading the dataframe with spark.read.json(), see below:

schema = "id_num string,indexes map<string,array<struct<data_sufficiency_indicator:boolean,low_value_reason:array<string>,value:double,version:string>>>"

df = spark.read.json("/path/to/jsons", schema=schema)

df.printSchema()
root
 |-- id_num: string (nullable = true)
 |-- indexes: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- data_sufficiency_indicator: boolean (nullable = true)
 |    |    |    |-- low_value_reason: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- value: double (nullable = true)
 |    |    |    |-- version: string (nullable = true)

and then do select and explode_outer/inline_outer multiple times to get the desired result:

df_new = df.selectExpr("id_num", "explode_outer(indexes) as (indexes_type, vals)") \
    .selectExpr("*", "inline_outer(vals)") \
    .selectExpr(
        "id_num",
        "indexes_type",
        "version",
        "data_sufficiency_indicator",
        "value",
        "explode_outer(low_value_reason) as low_value_reason"
    )

df_new.show(truncate=False)
+------+---------------+-------+--------------------------+-----+----------------------------------+
|id_num|indexes_type   |version|data_sufficiency_indicator|value|low_value_reason                  |
+------+---------------+-------+--------------------------+-----+----------------------------------+
|1234  |visibility     |2.0    |true                      |2.16 |low scores from reviews_and_visits|
|1234  |visibility     |2.0    |true                      |2.16 |low scores from online_presence   |
|1234  |customer_rating|2.0    |false                     |null |null                              |
|1234  |reputation     |2.0    |false                     |null |null                              |
|5678  |visibility     |2.0    |true                      |2.71 |low scores from reviews_and_visits|
|5678  |visibility     |2.0    |true                      |2.71 |low scores from online_presence   |
|5678  |customer_rating|2.0    |false                     |null |null                              |
|9876  |visibility     |2.0    |true                      |3.06 |null                              |
|9876  |customer_rating|2.0    |false                     |null |null                              |
|9876  |reputation     |2.0    |false                     |null |null                              |
+------+---------------+-------+--------------------------+-----+----------------------------------+

BTW. I changed data_id to id_num in your sample JSON which I supposed is your typo. if not, just add data_id string into schema and then use coalesce(id_num,data_id) to get the final id_num column.

On the other hand, you can also try using from_json/to_json function after loading the dataframe without specifying schema, see a similar example here.

jxc
  • 13,553
  • 4
  • 16
  • 34