2

Im struggling to create a udf to extract some column data. The column is tricky since sometimes its string but in many cases can be struct. I want to take into consideration only the time when the column is struct and extract the needed information for it.

Assuming this example:

SELECT annoyingCol.data From SomeDf

annoyingCol.data equals to string OR struct in order to avoid getting error like this one: need struct type but got string;. I'm wondering if i can just create a udf which check the column type, e.g:

SELECT
  case when isStruct(annoyingCol.data) then annoyingCol.data.my_data else null end
FROM SomeDf

I tried this

val isStruct = udf((r: Row) => {
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types.BooleanType
    import scala.util.Try
    Try(r.getAs[String]("estimation_data_inc_waypoints")).isSuccess
  }
)
spark.udf.register("isStruct", isStruct)

but failed, I know i missing something. any help will be much appreciated.

Shaido
  • 27,497
  • 23
  • 70
  • 73
Benny Elgazar
  • 243
  • 2
  • 9
  • Do you mean that the column type is different for different dataframes (with the same column names)? – Shaido May 23 '18 at 08:12
  • Nop, the Dataframe is the same but the schema may change the json array is not strict and for some cases may change so each Row can be slightly different. Spark does a great work when figure our what the json schema is but since it just take a sample of rows, spark may miss some cases where the json structure is different. – Benny Elgazar May 23 '18 at 08:34

1 Answers1

0

Technically speaking you can create udf like this:

val isStruct = udf((r: Any) => r match {
  case _: Row => true
  case _ => false
})

val df = Seq(("foo", (1, "bar"))).toDF


df.select(isStruct($"_1")).show
// +-------+
// |UDF(_1)|
// +-------+
// |  false|
// +-------+


df.select(isStruct($"_2")).show
// +-------+
// |UDF(_2)|
// +-------+
// |   true|
// +-------+

but

The column is tricky since sometimes its string but in many cases can be struct.

doesn't sound right, because DataFrame cannot contain heterogenous columns, and

need struct type but got string

is planner error, which wouldn't be raise even it the types of udf wouldn't match the data. Instead you would get runtime ClassCastException similar to this:

Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.Row

So you problem is somewhere else. Probably here:

annoyingCol.data

If input is not well structured, and annoyingCol is sometimes inferred as StringType, dot syntax won't work, and you'll get the query planner exception as in the question.

This should be handled outside the query. You can either check the types:

 df.schema("annoyingCol").dataType match {
   case _: StructType =>  ??? // Take some path
   case _             =>  ??? // Take another path
 }

or can just check if table has required column

 if (hasColumn(df, "annoyingCol.data") {
   ??? // Take some path
 } else {
   ??? // Take another path
 }

If you can, I would strongly recommend fixing this upstream, either at the source, or before you parse data into DataFrame in Spark.

Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115