Moving an existing application from Spark 1.6 to Spark 2.2* has (eventually) brought about the error “org.apache.spark.SparkException: Task not serializable”. I have oversimplified my code to demonstrate the same error. The code queries a parquet file to return the following datatype: 'org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]' I apply a function to extract a string and integer , returning a string. An inherent issue is related to the point that Spark 2.2 returns a Dataset as opposed to dataframe. (see prior post about the preliminary errors) How do I write a Dataset encoder to support mapping a function to a org.apache.spark.sql.Dataset[String] in Scala Spark
var d1 = hive.executeQuery(st)
d1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [cvdt35_message_id_d: string, cvdt35_input_timestamp_s: decimal(16,5) ... 2 more fields]
scala> val parseCVDP_parquet = (s:org.apache.spark.sql.Row) => s.getString(2).split("0x")(1)+","+s.getDecimal(1);
parseCVDP_parquet: org.apache.spark.sql.Row => String = <function1>
scala> var d2 = d1.map(parseCVDP_parquet)
d2: org.apache.spark.sql.Dataset[String] = [value: string]
scala> def dd(s:String, start: Int) = { s + "some string" }
dd: (s: String, start: Int)String
scala> var d3 = d2.map{s=> dd(s,5) }
d3: org.apache.spark.sql.Dataset[String] = [value: string]
scala> d3.take(1)
org.apache.spark.SparkException: Task not serializable
My current solution to this problem is through embedding inline code (see below) but is not practical as my production code has substantial parameters and functions that are involved. I have also tried converting to data frame ( as it was in spark 1.6 ) and variations of function definitions which have not proved to represent a workable solution.
scala> var d1 = hive.executeQuery(st)
d1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [cvdt35_message_id_d: string, cvdt35_input_timestamp_s: decimal(16,5) ... 2 more fields]
scala> val parseCVDP_parquet = (s:org.apache.spark.sql.Row) => s.getString(2).split("0x")(1)+","+s.getDecimal(1);
parseCVDP_parquet: org.apache.spark.sql.Row => String = <function1>
scala> var d2 = d1.map(parseCVDP_parquet)
d2: org.apache.spark.sql.Dataset[String] = [value: string]
scala> var d3 = d2.map{s=> { s + "some string" } }
d3: org.apache.spark.sql.Dataset[String] = [value: string]
scala> d3.take(1)
20/04/30 15:16:17 WARN TaskSetManager: Stage 0 contains a task of very large size (132 KB). The maximum recommended task size is 100 KB.
res1: Array[String] = Array(761f006000705904,1521833533.96682some string)