35

Given Table 1 with one column "x" of type String. I want to create Table 2 with a column "y" that is an integer representation of the date strings given in "x".

Essential is to keep null values in column "y".

Table 1 (Dataframe df1):

+----------+
|         x|
+----------+
|2015-09-12|
|2015-09-13|
|      null|
|      null|
+----------+
root
 |-- x: string (nullable = true)

Table 2 (Dataframe df2):

+----------+--------+                                                                  
|         x|       y|
+----------+--------+
|      null|    null|
|      null|    null|
|2015-09-12|20150912|
|2015-09-13|20150913|
+----------+--------+
root
 |-- x: string (nullable = true)
 |-- y: integer (nullable = true)

While the user-defined function (udf) to convert values from column "x" into those of column "y" is:

val extractDateAsInt = udf[Int, String] (
  (d:String) => d.substring(0, 10)
      .filterNot( "-".toSet)
      .toInt )

and works, dealing with null values is not possible.

Even though, I can do something like

val extractDateAsIntWithNull = udf[Int, String] (
  (d:String) => 
    if (d != null) d.substring(0, 10).filterNot( "-".toSet).toInt 
    else 1 )

I have found no way, to "produce" null values via udfs (of course, as Ints can not be null).

My current solution for creation of df2 (Table 2) is as follows:

// holds data of table 1  
val df1 = ... 

// filter entries from df1, that are not null
val dfNotNulls = df1.filter(df1("x")
  .isNotNull)
  .withColumn("y", extractDateAsInt(df1("x")))
  .withColumnRenamed("x", "right_x")

// create df2 via a left join on df1 and dfNotNull having 
val df2 = df1.join( dfNotNulls, df1("x") === dfNotNulls("right_x"), "leftouter" ).drop("right_x")

Questions:

  • The current solution seems cumbersome (and probably not efficient wrt. performance). Is there a better way?
  • @Spark-developers: Is there a type NullableInt planned / avaiable, such that the following udf is possible (see Code excerpt ) ?

Code excerpt

val extractDateAsNullableInt = udf[NullableInt, String] (
  (d:String) => 
    if (d != null) d.substring(0, 10).filterNot( "-".toSet).toInt 
    else null )
Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
Martin Senne
  • 5,939
  • 6
  • 30
  • 47
  • Easy solution is to use boxed types: http://stackoverflow.com/questions/42791912/how-to-deal-with-spark-udf-input-output-of-primitive-nullable-type/42791913#42791913 – Artur Rashitov Mar 14 '17 at 16:57

3 Answers3

59

This is where Optioncomes in handy:

val extractDateAsOptionInt = udf((d: String) => d match {
  case null => None
  case s => Some(s.substring(0, 10).filterNot("-".toSet).toInt)
})

or to make it slightly more secure in general case:

import scala.util.Try

val extractDateAsOptionInt = udf((d: String) => Try(
  d.substring(0, 10).filterNot("-".toSet).toInt
).toOption)

All credit goes to Dmitriy Selivanov who've pointed out this solution as a (missing?) edit here.

Alternative is to handle null outside the UDF:

import org.apache.spark.sql.functions.{lit, when}
import org.apache.spark.sql.types.IntegerType

val extractDateAsInt = udf(
   (d: String) => d.substring(0, 10).filterNot("-".toSet).toInt
)

df.withColumn("y",
  when($"x".isNull, lit(null))
    .otherwise(extractDateAsInt($"x"))
    .cast(IntegerType)
)
Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Hi zero323, sounds awesome. Will try that out, and as soon as it works, reward you! BTW, thanks for the fast response!!! – Martin Senne Sep 02 '15 at 17:11
  • Don't use the solution using `scala.util.Try`. It will catch any errors inside. This is not good coding style. – Niclas von Caprivi Feb 05 '20 at 09:56
  • @NiclasvonCaprivi With a few well defined exceptions (and these usually happen at the level of the execution plan resolution) this is how SQL functions behave ‒ in case unexpected conditions, the value is undefined. – 10465355 Feb 06 '20 at 10:14
16

Scala actually has a nice factory function, Option(), that can make this even more concise:

val extractDateAsOptionInt = udf((d: String) => 
  Option(d).map(_.substring(0, 10).filterNot("-".toSet).toInt))

Internally the Option object's apply method is just doing the null check for you:

def apply[A](x: A): Option[A] = if (x == null) None else Some(x)
tristanbuckner
  • 161
  • 1
  • 2
11

Supplementary code

With the nice answer of @zero323, I created the following code, to have user defined functions available that handle null values as described. Hope, it is helpful for others!

/**
 * Set of methods to construct [[org.apache.spark.sql.UserDefinedFunction]]s that
 * handle `null` values.
 */
object NullableFunctions {

  import org.apache.spark.sql.functions._
  import scala.reflect.runtime.universe.{TypeTag}
  import org.apache.spark.sql.UserDefinedFunction

  /**
   * Given a function A1 => RT, create a [[org.apache.spark.sql.UserDefinedFunction]] such that
   *   * if fnc input is null, None is returned. This will create a null value in the output Spark column.
   *   * if A1 is non null, Some( f(input) will be returned, thus creating f(input) as value in the output column.
   * @param f function from A1 => RT
   * @tparam RT return type
   * @tparam A1 input parameter type
   * @return a [[org.apache.spark.sql.UserDefinedFunction]] with the behaviour describe above
   */
  def nullableUdf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]): UserDefinedFunction = {
    udf[Option[RT],A1]( (i: A1) => i match {
      case null => None
      case s => Some(f(i))
    })
  }

  /**
   * Given a function A1, A2 => RT, create a [[org.apache.spark.sql.UserDefinedFunction]] such that
   *   * if on of the function input parameters is null, None is returned.
   *     This will create a null value in the output Spark column.
   *   * if both input parameters are non null, Some( f(input) will be returned, thus creating f(input1, input2)
   *     as value in the output column.
   * @param f function from A1 => RT
   * @tparam RT return type
   * @tparam A1 input parameter type
   * @tparam A2 input parameter type
   * @return a [[org.apache.spark.sql.UserDefinedFunction]] with the behaviour describe above
   */
  def nullableUdf[RT: TypeTag, A1: TypeTag, A2: TypeTag](f: Function2[A1, A2, RT]): UserDefinedFunction = {
    udf[Option[RT], A1, A2]( (i1: A1, i2: A2) =>  (i1, i2) match {
      case (null, _) => None
      case (_, null) => None
      case (s1, s2) => Some((f(s1,s2)))
    } )
  }
}
Martin Senne
  • 5,939
  • 6
  • 30
  • 47