I'm creating data frame with this code:
val data = List(
List(444.1235D),
List(67.5335D),
List(69.5335D),
List(677.5335D),
List(47.5335D),
List(null)
)
val rdd = sparkContext.parallelize(data).map(Row.fromSeq(_))
val schema = StructType(Array(
StructField("value", DataTypes.DoubleType, true)
))
val df = sqlContext.createDataFrame(rdd, schema)
Then I apply my udf to it:
val multip: Dataset[Double] = df.select(doubleUdf(df("value"))).as[Double]
and then I'm trying to use reduce on this dataset:
val multipl = multip.reduce(_ * _)
And here I got 0.0 as a result. Also I tried to filter nulls out
val multipl = multip.filter(_ != null).reduce(_ * _)
with the same result. If I remove null value from data everything works as it should. How can I make reduce work with null values?
My udf is defined like this:
val doubleUdf: UserDefinedFunction = udf((v: Any) => Try(v.toString.toDouble).toOption)