1

I have a JSON input which consists of an array that will be exploded as follows:

new_df = df \
    .withColumn("x", explode_outer(col("x"))) \
        .select(
            col("x.p").alias("xp"),
            col("x.q").alias("xq"),
            col("x.r.l.g").alias("xrlg"),
            col("x.r.m.f").alias("xrmf"),
            col("x.r.n").alias("xrn"),
            col("x.r.o").alias("xro"),
            col("x.r.s").alias("xrs"),
    )

Sometimes the input file may be empty or may not have the JSON key 'x'. In such cases the pyspark code fails saying cannot resolve 'x' given input columns: [] .

Is there a way I can keep all the columns of this table and populate them all as NULL if this key is not present in the input JSON?

JohnWick
  • 63
  • 2
  • 8

2 Answers2

0

Simply check if the column exists in the df using has_column function defined here. (Written function below for reference)

from pyspark.sql.functions import lit, col, when
from pyspark.sql.types import *


def has_column(df, col):
    try:
        df[col]
        return True
    except AnalysisException:
        return False


if has_column(df, "x"):
    new_df = df.withColumn("x", explode_outer(col("x"))) \
        .select(
            col("x.p").alias("xp"),
            col("x.q").alias("xq"),
            col("x.r.l.g").alias("xrlg"),
            col("x.r.m.f").alias("xrmf"),
            col("x.r.n").alias("xrn"),
            col("x.r.o").alias("xro"),
            col("x.r.s").alias("xrs"))
else:
    new_df = df.withColumn("xp", lit(None).cast("string"))
               .withColumn("xq", lit(None).cast("string"))
               .withColumn("xrlg", lit(None).cast("string"))
               .withColumn("xrmf", lit(None).cast("string"))
               .withColumn("xrn", lit(None).cast("string"))
               .withColumn("xro", lit(None).cast("string"))
               .withColumn("xrs", lit(None).cast("string")) 

If you also want to check for inner json key values are present or not, you can do something like below for each column:

df.withColumn(
   "xp", 
   when(
       lit(has_column(df, "x.p")),
       col("x.p")
   ).otherwise(lit(None).cast("string")))

Another solution is to provide schema before reading from json file as suggested by hristo iliev

schema = StructType([
    StructField("x", StructType([
        StructField("p", StringType()),
        StructField("q", StringType()),
        ....
    ]))
])

df_record = spark.read.schema(schema).json("path/to/file.JSON",multiLine=True)
Drashti Dobariya
  • 2,455
  • 2
  • 10
  • 23
  • Thanks for your insights @Drashti Dobariya. However, this has_column function can't be applied on the column 'x' here. If done so, there will be an error such as this: `Generators are not supported when it's nested in expressions, but got: CASE WHEN true THEN generatorouter(explode(x)) ELSE CAST(NULL AS STRING) END`. Is there a way to apply this on the column being exploded? – JohnWick Oct 06 '21 at 09:21
  • Can you add the exact code that you tried? – Drashti Dobariya Oct 06 '21 at 10:05
  • Actually I was able resolve it by using the 1st method in your answer for the explode column and the 2nd method in your answer for the nested JSON's within this array. Thanks! – JohnWick Oct 06 '21 at 11:55
  • hi @DrashtiDobariya , can you please look into this question if possible - https://stackoverflow.com/questions/69475845/is-there-any-way-to-retain-the-state-of-multiple-expandable-mat-grid-after-refre ? – angular_code Oct 07 '21 at 07:15
  • 1
    I tried using the inline function but it evaluates the when condition even if the column is not present. ```.withColumn("abc", when( \ lit(has_column(df, "pqr.stu.i")), element_at("pqr.stu.i", -1)) \ .otherwise(lit(None).cast("string")))``` Even though there is no i under pqr.stu, it should have become NULL. But instead it is throwing an error suggesting that the second half of when is also getting executed. – JohnWick Oct 08 '21 at 10:58
  • It is difficult to debug with the above statement. Create another question and mention the error in detail and also the code that you have written – Drashti Dobariya Oct 12 '21 at 08:19
0

Another option is to load the file with the schema:

sqlContext.read.format('json').schema(schema_var).load(filename)

but it does require you to provide the full possible schema in schema_var to work.

Twilight
  • 21
  • 1