2

When I create UDF function as shown above, I get Task Serialization error. This error appears only when I run the code in the cluster deploy mode using spark-submit. However, it works well in spark-shell.

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import scala.collection.mutable.WrappedArray

def mfnURL(arr: WrappedArray[String]): String = {
  val filterArr = arr.filterNot(_ == null)
  if (filterArr.length == 0)
    return null
  else {
    filterArr.groupBy(identity).maxBy(_._2.size)._1
  }
}

val mfnURLUDF = udf(mfnURL _)

def windowSpec = Window.partitionBy("nodeId", "url", "typology")                                                     
val result = df.withColumn("count", count("url").over(windowSpec))
  .orderBy($"count".desc)                                                                                            
  .groupBy("nodeId","typology")                                                                                      
  .agg(
  first("url"),
  mfnURLUDF(collect_list("source_url")),
  min("minTimestamp"),
  max("maxTimestamp")
)

I tried to add spark.udf.register("mfnURLUDF",mfnURLUDF), but it did not solve the problem.

Markus
  • 3,562
  • 12
  • 48
  • 85

1 Answers1

2

You can also try to create udf this way:

val mfnURL = udf { arr: WrappedArray[String] =>
  val filterArr = arr.filterNot(_ == null)
  if (filterArr.length == 0)
    return null
  else {
    filterArr.groupBy(identity).maxBy(_._2.size)._1
  }
}
merenptah
  • 476
  • 4
  • 15
  • 1
    I am not an expert, but in absence of merenpath, I think the point is that def function is part of a class or object and that causes serializable issue, whereas val function does not. def is a method. @Markus – thebluephantom Sep 08 '18 at 23:28
  • I am hoping he will respond. Here is a good link imho https://stackoverflow.com/questions/43592742/spark-scala-task-not-serializable-error/43596624 – thebluephantom Sep 09 '18 at 12:21
  • This udf function creates seriazable class: Class org.apache.spark.sql.expressions.UserDefinedFunction extends Object implements Serializable – merenptah Sep 09 '18 at 18:11