7

I am restoring a stream from a HDFS checkpoint (ConstantInputDSTream for example) but I keep getting SparkException: <X> has not been initialized.

Is there something specific I need to do when restoring from checkpointing?

I can see that it wants DStream.zeroTime set but when the stream is restored zeroTime is null. It doesn't get restored possibly due to it being a private member IDK. I can see that the StreamingContext referenced by the restored stream does have a value for zeroTime.

initialize is a private method and gets called at StreamingContext.graph.start but not by StreamingContext.graph.restart, presumably because it expects zeroTime to have been persisted.

Does someone have an example of a Stream that recovers from a checkpoint and has a non null value for zeroTime?

def createStreamingContext(): StreamingContext = {
    val ssc = new StreamingContext(sparkConf, Duration(1000))
    ssc.checkpoint(checkpointDir)
    ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDir), createStreamingContext)

val socketStream = ssc.socketTextStream(...)
socketStream.checkpoint(Seconds(1))
socketStream.foreachRDD(...)
Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
Shane Kinsella
  • 267
  • 2
  • 13

3 Answers3

14

The problem was that I created the dstreams after the StreamingContext had been recreated from checkpoint, i.e. after StreamingContext.getOrCreate. Creating dstreams and all transformations should've been in createStreamingContext.

The issue was filled as [SPARK-13316] "SparkException: DStream has not been initialized" when restoring StreamingContext from checkpoint and the dstream is created afterwards.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
Shane Kinsella
  • 267
  • 2
  • 13
1

This Exception may also occur when you are trying to use same check-pointing directory for 2 different spark streaming jobs. In that case also you will get this exception.

Try using unique checkpoint directory for each spark job.

0

ERROR StreamingContext: Error starting the context, marking it as stopped org.apache.spark.SparkException: org.apache.spark.streaming.dstream.FlatMappedDStream@6c17c0f8 has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:313) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) at scala.Option.orElse(Option.scala:289)


The above error was due to the fact that I also had another Spark Job writing to the same checkpointdir. Even though the other spark job was not running, the fact that it had written to the checkpointdir, the new Spark Job was not able to configure the StreamingContext.

I deleted the contents of the checkpointdir and resubmitted the Spark Job, and the issue was resolved.

Alternatively, you can just use a separate checkpointdir for each Spark Job, to keep it simple.

Yusuf Arif
  • 21
  • 2