Ran into this error when trying to run a spark streaming application with checkpointing enabled.
java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable
Serialization stack:
org.apache.spark.streaming.StreamingContext
Serialization stack:
- object not serializable (class: org.apache.spark.streaming.StreamingContext, value: org.apache.spark.streaming.StreamingContext@63cf0da6)
- object not serializable (class: org.apache.spark.streaming.StreamingContext, value: org.apache.spark.streaming.StreamingContext@63cf0da6)
- field (class: com.sales.spark.job.streaming.SalesStream, name: streamingContext, type: class org.apache.spark.streaming.StreamingContext)
- field (class: com.sales.spark.job.streaming.SalesStream, name: streamingContext, type: class org.apache.spark.streaming.StreamingContext)
- object (class com.sales.spark.job.streaming.SalesStreamFactory$$anon$1, com.sales.spark.job.streaming.SalesStreamFactory$$anon$1@1738d3b2)
- object (class com.sales.spark.job.streaming.SalesStreamFactory$$anon$1, com.sales.spark.job.streaming.SalesStreamFactory$$anon$1@1738d3b2)
- field (class: com.sales.spark.job.streaming.SalesStream$$anonfun$runJob$1, name: $outer, type: class com.sales.spark.job.streaming.SalesStream)
- field (class: com.sales.spark.job.streaming.SalesStream$$anonfun$runJob$1, name: $outer, type: class com.sales.spark.job.streaming.SalesStream)
- object (class com.sales.spark.job.streaming.SalesStream$$anonfun$runJob$1, <function1>)
- object (class com.sales.spark.job.streaming.SalesStream$$anonfun$runJob$1, <function1>)
Trying to execute the piece of code. I am thinking the issue has to do with trying to access the spark
session variable inside the tempTableView
function
Code
liveRecordStream
.foreachRDD(newRDD => {
if (!newRDD.isEmpty()) {
val cacheRDD = newRDD.cache()
val updTempTables = tempTableView(t2s, stgDFMap, cacheRDD)
val rdd = updatestgDFMap(stgDFMap, cacheRDD)
persistStgTable(stgDFMap)
dfMap
.filter(entry => updTempTables.contains(entry._2))
.map(spark.sql)
.foreach( df => writeToES(writer, df))
}
}
tempTableView
def tempTableView(t2s: Map[String, StructType], stgDFMap: Map[String, DataFrame], cacheRDD: RDD[cacheRDD]): Set[String] = {
stgDFMap.keys.filter { table =>
val tRDD = cacheRDD
.filter(r => r.Name == table)
.map(r => r.values)
val tDF = spark.createDataFrame(tRDD, tableNameToSchema(table))
if (!tRDD.isEmpty()) {
val tName = s"temp_$table"
tDF.createOrReplaceTempView(tName)
}
!tRDD.isEmpty()
}.toSet
}
Not sure how to get the spark session variable inside this function which is called inside foreachRDD
.
I am instantiating the streamingContext as part of a different class.
class Test {
lazy val sparkSession: SparkSession =
SparkSession
.builder()
.appName("testApp")
.config("es.nodes", SalesConfig.elasticnode)
.config("es.port", SalesConfig.elasticport)
.config("spark.sql.parquet.filterPushdown", parquetFilterPushDown)
.config("spark.debug.maxToStringFields", 100000)
.config("spark.rdd.compress", rddCompress)
.config("spark.task.maxFailures", 25)
.config("spark.streaming.unpersist", streamingUnPersist)
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
lazy val streamingContext: StreamingContext = new StreamingContext(sparkSession.sparkContext,Seconds(15))
streamingContext.checkpoint("/Users/gswaminathan/Guidewire/Java/explore-policy/checkpoint/")
}
I tried extending this class as Serializable
, but no luck.