0

I have the following dataframe in spark:

root
 |-- user_id: string (nullable = true)
 |-- payload: string (nullable = true)

in which payload is an json string with no fixed schema, here are some sample data:

{'user_id': '001','payload': '{"country":"US","time":"11111"}'}
{'user_id': '002','payload': '{"message_id":"8936716"}'}
{'user_id': '003','payload': '{"brand":"adidas","when":""}'}

I want to output the above data in json format with the flattened payload(basically just extracting key value pairs from payload and put them into the root level), for example:

{'user_id': '001','country':'US','time':'11111'}
{'user_id': '002','message_id':'8936716'}
{'user_id': '003','brand':'adidas','when':''}

Stackoverflow said this is a duplicated question to Flatten Nested Spark Dataframe but it's not.. The difference here is that the value of payload in my case is just string type.

mck
  • 40,932
  • 13
  • 35
  • 50
tykhelloworld
  • 31
  • 1
  • 4

2 Answers2

1

You can parse the payload JSON as a map<string,string> and add the user_id to the payload:

import pyspark.sql.functions as F

# input dataframe
df.show(truncate=False)
+-------+-------------------------------+
|user_id|payload                        |
+-------+-------------------------------+
|001    |{"country":"US","time":"11111"}|
|002    |{"message_id":"8936716"}       |
|003    |{"brand":"adidas","when":""}   |
+-------+-------------------------------+

df2 = df.select(
    F.to_json(
        F.map_concat(
            F.create_map(F.lit('user_id'), F.col('user_id')), 
            F.from_json('payload', 'map<string,string>')
        )
    ).alias('out')
)

df2.show(truncate=False)
+-----------------------------------------------+
|out                                            |
+-----------------------------------------------+
|{"user_id":"001","country":"US","time":"11111"}|
|{"user_id":"002","message_id":"8936716"}       |
|{"user_id":"003","brand":"adidas","when":""}   |
+-----------------------------------------------+

To write it to a JSON file, you can do:

df2.coalesce(1).write.text('filepath')
mck
  • 40,932
  • 13
  • 35
  • 50
0

This is how I finally solved the problem

json_schema = spark.read.json(source_parquet_df.rdd.map(lambda row: row.payload)).schema
new_df=source_parquet_df.withColumn('payload_json_obj',from_json(col('payload'),json_schema)).drop(source_parquet_df.payload)
flat_df = new_df.select([c for c in new_df.columns if c != 'payload_json_obj']+['payload_json_obj.*'])
tykhelloworld
  • 31
  • 1
  • 4