In my application, I want to keep track of multiple states. Thus I tried to encapsulate the whole state management logic within a class StateManager
as follows:
@SerialVersionUID(xxxxxxxL)
class StateManager(
inputStream: DStream[(String, String)],
initialState: RDD[(String, String)]
) extends Serializable {
lazy val state = inputStream.mapWithState(stateSpec).map(_.get)
lazy val stateSpec = StateSpec
.function(trackStateFunc _)
.initialState(initialState)
.timeout(Seconds(30))
def trackStateFunc(key: String, value: Option[String], state: State[String]): Option[(String, String)] = {}
}
object StateManager { def apply(dstream: DStream[(String, String)], initialstate: RDD[(String, String)]) = new StateManager(_dStream, _initialState) }
The @SerialVersionUID(xxxxxxxL) ... extends Serializable
is an attempt to solve my problem.
But when calling StateManager
from my main class like the following:
val lStreamingContext = StreamingEnvironment(streamingWindow, checkpointDirectory)
val statemanager= StateManager(lStreamingEnvironment.sparkContext, 1, None)
val state= statemanager.state(lKafkaStream)
state.foreachRDD(_.foreach(println))
(see below for StreamingEnvironment
), I get:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
[...]
Caused by: java.io.NotSerializableException: Object of org.apache.spark.streaming.kafka.DirectKafkaInputDStream is being serialized possibly as a part of closure of an RDD operation. This is because the DStream object is being referred to from within the closure. Please rewrite the RDD operation inside this DStream to avoid this. This has been enforced to avoid bloating of Spark tasks with unnecessary objects.
The error is clear, but still I don't get on what point does it trigger.
Where does it trigger? What could I do to solve this and have a reusable class?
The might-be-useful StreamingEnvironment
class:
class StreamingEnvironment(mySparkConf: SparkConf, myKafkaConf: KafkaConf, myStreamingWindow: Duration, myCheckPointDirectory: String) {
val sparkContext = spark.SparkContext.getOrCreate(mySparkConf)
lazy val streamingContext = new StreamingContext(sparkContext , mMicrobatchPeriod)
streamingContext.checkpoint(mCheckPointDirectory)
streamingContext.remember(Minutes(1))
def stream() = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, myKafkaConf.mBrokers, myKafkaConf.mTopics)
}
object StreamingEnvironment {
def apply(streamingWindow: Duration, checkpointDirectory: String) = {
//setup sparkConf and kafkaConf
new StreamingEnvironment(sparkConf , kafkaConf, streamingWindow, checkpointDirectory)
}
}