9

Here is a Spark UDF that I'm using to compute a value using few columns.

def spark_udf_func(s: String, i:Int): Boolean = { 
    // I'm returning true regardless of the parameters passed to it.
    true
}

val spark_udf = org.apache.spark.sql.functions.udf(spark_udf_func _)

val df = sc.parallelize(Array[(Option[String], Option[Int])](
  (Some("Rafferty"), Some(31)), 
  (null, Some(33)), 
  (Some("Heisenberg"), Some(33)),  
  (Some("Williams"), null)
)).toDF("LastName", "DepartmentID")

df.withColumn("valid", spark_udf(df.col("LastName"), df.col("DepartmentID"))).show()
+----------+------------+-----+
|  LastName|DepartmentID|valid|
+----------+------------+-----+
|  Rafferty|          31| true|
|      null|          33| true|
|Heisenberg|          33| true|
|  Williams|        null| null|
+----------+------------+-----+

Can anyone explain why the value for column valid is null for the last row?

When I checked the spark plan I was able to figure that the plan has a case condition where it says if column2 (DepartmentID) is null it has to return null.

== Physical Plan ==

*Project [_1#699 AS LastName#702, _2#700 AS DepartmentID#703, if (isnull(_2#700)) null else UDF(_1#699, _2#700) AS valid#717]
+- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, unwrapoption(ObjectType(class java.lang.String), assertnotnull(input[0, scala.Tuple2, true])._1), true) AS _1#699, unwrapoption(IntegerType, assertnotnull(input[0, scala.Tuple2, true])._2) AS _2#700]
   +- Scan ExternalRDDScan[obj#698]

Why do we have such behaviour in Spark?
Why only Integer columns?
What is it that I'm doing wrong here, what is the proper way to handle null's within UDF when the UDF parameter is null?

zero323
  • 322,348
  • 103
  • 959
  • 935
Sudev Ambadi
  • 655
  • 6
  • 12
  • see also https://stackoverflow.com/questions/42791912/how-to-deal-with-spark-udf-input-output-of-primitive-nullable-type – Raphael Roth Sep 05 '17 at 11:39

2 Answers2

10

The issue is that null is not a valid value for scala Int (which is the backing value) while it is a valid value for String. Int is equivalent to java int primitive and must have a value. This means the udf can't be called when the value is null and therefore null remains.

There are two ways to solve this:

  1. Change the function to accept java.lang.Integer (which is an object and can be null)
  2. If you can't change the function, you can use when/otherwise to do something special in case of null. For example when(col("int col").isNull, someValue).otherwise(the original call)
Tim Gautier
  • 29,150
  • 5
  • 46
  • 53
Assaf Mendelson
  • 12,701
  • 5
  • 47
  • 56
  • 2
    there is also a third option which allows you to stick with Scala Int: pack the arguments in a struct (using `df.withColumn("valid", spark_udf(struct(df.col("LastName"), df.col("DepartmentID"))))`) and use `Row` as the input parameter for the udf. Inside the udf, you can then check the row for null values using `row.isNullAt(i: Int)` – Raphael Roth Sep 05 '17 at 11:35
1

To accept null, Please use Integer ( Java datatype instead of Scala Int)

def spark_udf_func(s: String, i:Integer): Boolean = { 
    // I'm returning true regardless of the parameters passed to it.
    if(i == null) false else true
}
SunnyShah
  • 28,934
  • 30
  • 90
  • 137