8

I have a dataframe in PySpark with 3 columns - json, date and object_id:

-----------------------------------------------------------------------------------------
|json                                                              |date      |object_id|
-----------------------------------------------------------------------------------------
|{'a':{'b':0,'c':{'50':0.005,'60':0,'100':0},'d':0.01,'e':0,'f':2}}|2020-08-01|xyz123   |
|{'a':{'m':0,'n':{'50':0.005,'60':0,'100':0},'d':0.01,'e':0,'f':2}}|2020-08-02|xyz123   |
|{'g':{'h':0,'j':{'50':0.005,'80':0,'100':0},'d':0.02}}            |2020-08-03|xyz123   |
-----------------------------------------------------------------------------------------

Now I have a list of variables: [a.c.60, a.n.60, a.d, g.h]. I need to extract only these variables from the json column of above mentioned dataframe and to add those variables as columns in the dataframe with their respective values.

So in the end, the dataframe should look like:

-------------------------------------------------------------------------------------------------------
|json                                                    |date      |object_id|a.c.60|a.n.60|a.d |g.h|
-------------------------------------------------------------------------------------------------------
|{'a':{'b':0,'c':{'50':0.005,'60':0,'100':0},'d':0.01,...|2020-08-01|xyz123   |0     |null  |0.01|null|
|{'a':{'m':0,'n':{'50':0.005,'60':0,'100':0},'d':0.01,...|2020-08-02|xyz123   |null  |0     |0.01|null|
|{'g':{'h':0,'j':{'k':0.005,'':0,'100':0},'d':0.01}}     |2020-08-03|xyz123   |null  |null  |0.02|0   |
-------------------------------------------------------------------------------------------------------

Please help to get this result dataframe. The main problem I am facing is due to no fixed structure for the incoming json data. The json data can be anything in nested form but I need to extract only the given four variables. I have achieved this in Pandas by flattening out the json string and then to extract the 4 variables but in Spark it is getting difficult.

Lamanus
  • 12,898
  • 4
  • 21
  • 47
arin1405
  • 677
  • 1
  • 7
  • 18

1 Answers1

15

There are 2 ways to do it:

  1. use the get_json_object function, like this:
import pyspark.sql.functions as F

df = spark.createDataFrame(['{"a":{"b":0,"c":{"50":0.005,"60":0,"100":0},"d":0.01,"e":0,"f":2}}',
                            '{"a":{"m":0,"n":{"50":0.005,"60":0,"100":0},"d":0.01,"e":0,"f":2}}',
                            '{"g":{"h":0,"j":{"50":0.005,"80":0,"100":0},"d":0.02}}'],
                           StringType())

df3 = df.select(F.get_json_object(F.col("value"), "$.a.c.60").alias("a_c_60"),
                F.get_json_object(F.col("value"), "$.a.n.60").alias("a_n_60"),
                F.get_json_object(F.col("value"), "$.a.d").alias("a_d"),
                F.get_json_object(F.col("value"), "$.g.h").alias("g_h"))

will give:

>>> df3.show()
+------+------+----+----+
|a_c_60|a_n_60| a_d| g_h|
+------+------+----+----+
|     0|  null|0.01|null|
|  null|     0|0.01|null|
|  null|  null|null|   0|
+------+------+----+----+
  1. Declare schema explicitly (only necessary fields), convert JSON into structus using the from_json function with the schema, and then extract individual values from structures - this could be more performant than JSON Path:
from pyspark.sql.types import *
import pyspark.sql.functions as F

aSchema = StructType([
    StructField("c", StructType([
        StructField("60", DoubleType(), True)
    ]), True),
    StructField("n", StructType([
        StructField("60", DoubleType(), True)
    ]), True),
    StructField("d", DoubleType(), True),
])
gSchema = StructType([
    StructField("h", DoubleType(), True)
])

schema = StructType([
    StructField("a", aSchema, True),
    StructField("g", gSchema, True)
])

df = spark.createDataFrame(['{"a":{"b":0,"c":{"50":0.005,"60":0,"100":0},"d":0.01,"e":0,"f":2}}',
                            '{"a":{"m":0,"n":{"50":0.005,"60":0,"100":0},"d":0.01,"e":0,"f":2}}',
                            '{"g":{"h":0,"j":{"50":0.005,"80":0,"100":0},"d":0.02}}'],
                           StringType())

df2 = df.select(F.from_json("value", schema=schema).alias('data')).select('data.*')
df2.select(df2.a.c['60'], df2.a.n['60'], df2.a.d, df2.g.h).show()

will give

+------+------+----+----+
|a.c.60|a.n.60| a.d| g.h|
+------+------+----+----+
|   0.0|  null|0.01|null|
|  null|   0.0|0.01|null|
|  null|  null|null| 0.0|
+------+------+----+----+
Alex Ott
  • 80,552
  • 8
  • 87
  • 132
  • Thanks Alex Ott, the first method worked for me. I did not try the second one though. Thanks a lot for your help :) – arin1405 Aug 21 '20 at 05:33
  • Hi @Alex Ott What will be code if the dataframe is like (List of dictionaries): df = spark.createDataFrame(['[{"a":{"b":0,"c":{"50":0.005,"60":0,"100":0},"d":0.01,"e":0,"f":2}}]', '[{"a":{"m":0,"n":{"50":0.005,"60":0,"100":0},"d":0.01,"e":0,"f":2}}]', '[{"g":{"h":0,"j":{"50":0.005,"80":0,"100":0},"d":0.02}}]'], StringType()) – Suraj Tripathi May 27 '22 at 15:02