0

I am trying a logic to return an empty column if column does not exist in dataframe.

Schema changes very frequent, sometime the whole struct will be missing (temp1) or array inside struct will be missing (suffix)

Schema looks like this:

    root
     |-- id: string (nullable = true)
     |-- temp: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- temp1: struct (nullable = true)
     |    |    |    |-- code: string (nullable = true)
     |    |    |    |-- code1: array (nullable = true)
     |    |    |    |    |-- element: string (containsNull = true)
     |    |    |-- temp2: struct (nullable = true)
     |    |    |    |-- name1: array (nullable = true)
     |    |    |    |    |-- element: string (containsNull = true)
     |    |    |    |-- suffix: array (nullable = true)
     |    |    |    |    |-- element: string (containsNull = true)
     |-- timestamp: timestamp (nullable = true)
    

Or like this:

     root
      |-- id: string (nullable = true)
      |-- temp: array (nullable = true)
      |    |-- element: struct (containsNull = true)
      |    |    |-- temp2: struct (nullable = true)
      |    |    |    |-- name1: array (nullable = true)
      |    |    |    |    |-- element: string (containsNull = true)
      |-- timestamp: timestamp (nullable = true)

When I am trying the below logic for the second schema, getting an exception that Struct not found

def has_Column(df: DataFrame, path: String) = Try(df(path)).isSuccess
    
df.withColumn("id", col("id")).
  withColumn("tempLn", explode(col("temp"))).
  withColumn("temp1_code1", when(lit(has_Column(df, "tempLn.temp1.code1")), concat_ws(" ",col("tempLn.temp1.code1"))).otherwise(lit("").cast("string"))).
  withColumn("temp2_suffix", when(lit(has_Column(df, "tempLn.temp2.suffix")), concat_ws(" ",col("tempLn.temp2.suffix"))).otherwise(lit("").cast("string")))

Error:

org.apache.spark.sql.AnalysisException: No such struct field temp1;

blackbishop
  • 30,945
  • 11
  • 55
  • 76
Rex
  • 51
  • 5

1 Answers1

0

You need to do the check the existence outside the select/withColumn... methods. As you reference it in the then part of case when expression, Spark tries to resolve it during the analysis of the query.

So you'll need to test like this:

if (has_Column(df, "tempLn.temp1.code1"))
   df.withColumn("temp2_suffix", concat_ws(" ",col("tempLn.temp2.suffix")))
else
   df.withColumn("temp2_suffix", lit(""))

To do it for multiple columns you can use foldLeft like this:

val df1 = Seq(
  ("tempLn.temp1.code1", "temp1_code1"),
  ("tempLn.temp2.suffix", "temp2_suffix")
).foldLeft(df) {
  case (acc, (field, newCol)) => {
    if (has_Column(acc, field))
      acc.withColumn(newCol, concat_ws(" ", col(field)))
    else
      acc.withColumn(newCol, lit(""))
  }
}
blackbishop
  • 30,945
  • 11
  • 55
  • 76