0

I have this scenerio to capture the JSON path if it doesn't match with the JSON paths provided in the code, because the size of the source JSON data I get is huge and the schema is unpredictable ...

It varies like the below provided JSON Values.

Below is the sample JSON values:

JSON Value 1: (Known Schema)

main.pull.notify.roit.lerk

[{
    "main" : {
        "pull" :{
            "notify" : {
                "roit" : {
                    "lerk" : "value_a"
                }
            }
        }
    }
}]

JSON Value 2: (Unknown Schema)

main.pull.notify.roit.late_lerk

[{
    "main" : {
        "pull" :{
            "notify" : {
                "roit" : {
                    "late_lerk" : "value_a"
                }
            }
        }
    }
}]

Below is the current code I use to capture the JSON values based on the Known Schema

df = df.withColumn('lerk_value', when(df.main.pull.notify.roit.lerk.isNotNull(), df["main.pull.notify.roit.lerk"]).otherwise(""))

So, let's assume on today's run my code is reading the data from the source, since the JSON values in the source is unpredictable the Known Schema didn't appear today. But, the unknown schema is provided with the same valid data (value_a). Also, the code will fail due to schema mismatch.

Is it possible to create a fail safe to capture the path of the unknown schema like this main.pull.notify.roit.late_lerk then print it and continue the code!?

Help is much appreciated!!

Thanks in advance.

Naveen B
  • 113
  • 3
  • 12

1 Answers1

0

If the structure is always one column after roit you can do the following:

cols = df.select("main.pull.notify.roit.*").columns
df = df.withColumn('lerk_value', F.when(df[f"main.pull.notify.roit.{cols[0]}"].isNotNull(), 
                                        df[f"main.pull.notify.roit.{cols[0]}"]).otherwise(""))

Otherwise if the intention is to select only if a particular column is there then check if the column exists like shown here:

def has_column(df, col):
    try:
        df[col]
        return True
    except AnalysisException:
        return False

if has_column(df, "main.pull.notify.roit.lerk"):
    df = df.withColumn('lerk_value', F.when(df.main.pull.notify.roit.lerk.isNotNull(), 
                                            df["main.pull.notify.roit.lerk"]).otherwise(""))

Alternative if column name of different schema are known in advance

When you read, you could give the schema. This will populate the value as null if the column does not exist. This will allow you to use 2 .when() conditions to get the desired output.

Your current known schema looks like:

schema = StructType([
    StructField('main', StructType([
        StructField('pull', StructType([
            StructField('notify', StructType([
                StructField('roit', StructType([
                    StructField('lerk', StringType(), True)]), 
                True)]), 
            True)]), 
        True)]), 
    True)
])

Now add the late_lerk column:

schema = StructType([
    StructField('main', StructType([
        StructField('pull', StructType([
            StructField('notify', StructType([
                StructField('roit', StructType([
                    StructField('lerk', StringType(), True),
                    StructField('late_lerk', StringType(), True)]), 
                True)]), 
            True)]), 
        True)]), 
    True)
])

Now the read will contain both columns and will fill null for the missing one according to the data. Hence you can do the following in code:

df = df.withColumn('lerk_value', F.when(df.main.pull.notify.roit.lerk.isNotNull(), df["main.pull.notify.roit.lerk"])\
                           .when(df.main.pull.notify.roit.late_lerk.isNotNull(), df["main.pull.notify.roit.late_lerk"])\
                           .otherwise(""))

Complete code

schema = StructType([
    StructField('main', StructType([
        StructField('pull', StructType([
            StructField('notify', StructType([
                StructField('roit', StructType([
                    StructField('lerk', StringType(), True),
                    StructField('late_lerk', StringType(), True)]), 
                True)]), 
            True)]), 
        True)]), 
    True)
])
df=spark.read.format("json").schema(schema).option("multiline","true").load(file_path)

df = df.withColumn('lerk_value', F.when(df.main.pull.notify.roit.lerk.isNotNull(), df["main.pull.notify.roit.lerk"])\
                           .when(df.main.pull.notify.roit.late_lerk.isNotNull(), df["main.pull.notify.roit.late_lerk"])\
                           .otherwise(""))
viggnah
  • 1,709
  • 1
  • 3
  • 12
  • True.. I can add the new key to my schema, If only I know that there's a key ```late_lerk``` , but let's assume we only know ```lerk``` key and don't know that there's a Key ```late_lerk``` came in to our source on today's run. Is there a way to identify it!? – Naveen B Aug 03 '22 at 10:41
  • @NaveenB, I have added a couple of alternatives to my answer – viggnah Aug 03 '22 at 11:28
  • Thanks @viggnah for your time and opinions! I'll try this soultion out... Also, out of curiosity what if we get more then one column under ```roit``` !? – Naveen B Aug 03 '22 at 18:02
  • @NaveenB, then you can do it in a loop and do `col[i]`. Also keep in mind that if it's nested columns then it won't work. – viggnah Aug 04 '22 at 05:17