1

Moving an existing application from Spark 1.6 to Spark 2.2* has (eventually) brought about the error “org.apache.spark.SparkException: Task not serializable”. I have oversimplified my code to demonstrate the same error. The code queries a parquet file to return the following datatype: 'org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]' I apply a function to extract a string and integer , returning a string. An inherent issue is related to the point that Spark 2.2 returns a Dataset as opposed to dataframe. (see prior post about the preliminary errors) How do I write a Dataset encoder to support mapping a function to a org.apache.spark.sql.Dataset[String] in Scala Spark

var d1 = hive.executeQuery(st)
d1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [cvdt35_message_id_d: string, cvdt35_input_timestamp_s: decimal(16,5) ... 2 more fields]

scala> val parseCVDP_parquet = (s:org.apache.spark.sql.Row) => s.getString(2).split("0x")(1)+","+s.getDecimal(1);
parseCVDP_parquet: org.apache.spark.sql.Row => String = <function1>

scala> var d2 =  d1.map(parseCVDP_parquet)
d2: org.apache.spark.sql.Dataset[String] = [value: string]

scala> def dd(s:String, start: Int) = { s + "some string" }
dd: (s: String, start: Int)String

scala> var d3 = d2.map{s=> dd(s,5) }
d3: org.apache.spark.sql.Dataset[String] = [value: string]

scala> d3.take(1)
org.apache.spark.SparkException: Task not serializable

My current solution to this problem is through embedding inline code (see below) but is not practical as my production code has substantial parameters and functions that are involved. I have also tried converting to data frame ( as it was in spark 1.6 ) and variations of function definitions which have not proved to represent a workable solution.

scala> var d1 = hive.executeQuery(st)
d1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [cvdt35_message_id_d: string, cvdt35_input_timestamp_s: decimal(16,5) ... 2 more fields]

scala> val parseCVDP_parquet = (s:org.apache.spark.sql.Row) => s.getString(2).split("0x")(1)+","+s.getDecimal(1);
parseCVDP_parquet: org.apache.spark.sql.Row => String = <function1>

scala> var d2 =  d1.map(parseCVDP_parquet)
d2: org.apache.spark.sql.Dataset[String] = [value: string]

scala> var d3 = d2.map{s=> { s + "some string" } }
d3: org.apache.spark.sql.Dataset[String] = [value: string]

scala> d3.take(1)
20/04/30 15:16:17 WARN TaskSetManager: Stage 0 contains a task of very large size (132 KB). The maximum recommended task size is 100 KB.
res1: Array[String] = Array(761f006000705904,1521833533.96682some string)

1 Answers1

0

org.apache.spark.SparkException: Task not serialization

To fix this issue put all your functions & variables inside Object. Use those functions & variables wherever it is required.

In this way you can fix most of serialization issue

Example

package common
object AppFunctions {
  def append(s: String, start: Int) = s"${s}some thing"
}

object ExecuteQuery {
 import common.AppFunctions._

 [...]

 val d3 = d2.map(s => append(s,5)) // Pass required values to method.

 [...]


}
Srinivas
  • 8,957
  • 2
  • 12
  • 26