4

I'm trying to write a spark UDF that replaces the null values of a Double field with 0.0. I'm using the Dataset API. Here's the UDF:

val coalesceToZero=udf((rate: Double) =>  if(Option(rate).isDefined) rate else 0.0)

This is based on the following function that I tested to be working fine:

def cz(value: Double): Double = if(Option(value).isDefined) value else 0.0

cz(null.asInstanceOf[Double])
cz: (value: Double)Double
res15: Double = 0.0

But when I use it in Spark in the following manner the UDF doesn't work.

myDS.filter($"rate".isNull)
    .select($"rate", coalesceToZero($"rate")).show

+----+---------+
|rate|UDF(rate)|
+----+---------+
|null|     null|
|null|     null|
|null|     null|
|null|     null|
|null|     null|
|null|     null|
+----+---------+

However the following works:

val coalesceToZero=udf((rate: Any) =>  if(rate == null) 0.0 else rate.asInstanceOf[Double])

So I was wondering if Spark has some special way of handling null Double values.

Jit B
  • 1,206
  • 14
  • 26
  • This looks like _DataFrame_ API, not _Dataset_ API - what's the type of `myDS`? – Tzach Zohar Jul 24 '17 at 16:20
  • Its a big case class where the type of "rate" field is Double. I initialized the dataset by doing sourceDF.as[MyCaseClass] – Jit B Jul 24 '17 at 16:42
  • See related issue https://issues.apache.org/jira/browse/SPARK-12648 - a suggestion to allow UDFs with `Option[Double]` as input for nullable Double columns (and other primitives) – Tzach Zohar Jul 24 '17 at 21:51

1 Answers1

5

scala.Double cannot be null and the function you use, seems to work only because:

scala> null.asInstanceOf[Double]
res2: Double = 0.0

(You can find excellent answers describing this behavior in If an Int can't be null, what does null.asInstanceOf[Int] mean?).

If myDS is a statically typed dataset the right way is to use either use Option[Double]:

case class MyCaseClass(rate: Option[Double])

or java.lang.Double:

case class MyCaseClass(rate: java.lang.Double)

Either of these would allow you to handle nulls with statically typed API (not SQL / DataFrame) with the latter representation being favorable from the performance perspective.

In general, I'd recommend filling NULLs using SQL API:

import org.apache.spark.sql.functions.{coalesce, lit}

myDS.withColumn("rate", coalesce($"rate", lit(0.0)))

or DataFrameNaFunctions.fill:

df.na.fill(0.0, Seq("rate"))

before you convert Dataset[Row] to Dataset[MyCaseClass].

zero323
  • 322,348
  • 103
  • 959
  • 935