All right, so I've asked a somewhat similar question related to how Spark handles exceptions internally, but the example I had back then wasn't really clear or complete. An answer there pointed me in some direction but I can't really explain some things.
I've setup a dummy spark streaming app and in the transform stage I have a russian-roulette expression, which might or not throw an exception. If an exception is thrown, I stop the spark streaming context. That's it, no other logic, no RDD
transformation.
object ImmortalStreamingJob extends App {
val conf = new SparkConf().setAppName("fun-spark").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(1))
val elems = (1 to 1000).grouped(10)
.map(seq => ssc.sparkContext.parallelize(seq))
.toSeq
val stream = ssc.queueStream(mutable.Queue[RDD[Int]](elems: _*))
val transformed = stream.transform { rdd =>
try {
if (Random.nextInt(6) == 5) throw new RuntimeException("boom")
else println("lucky bastard")
rdd
} catch {
case e: Throwable =>
println("stopping streaming context", e)
ssc.stop(stopSparkContext = true, stopGracefully = false)
throw e
}
}
transformed.foreachRDD { rdd =>
println(rdd.collect().mkString(","))
}
ssc.start()
ssc.awaitTermination()
}
Running this in IntelliJ will throw the exception at some point. The fun part:
- if the exception is thrown in the first transformation (when the first RDD is processed), the spark context is stopped and the app dies, which is what I want
- if the exception is thrown after at least one
RDD
has been processed, the app hangs after printing the error message and never stops, which is not what I want
Why does the app hang instead of dying in the second case?
I'm running Spark 2.1.0 on Scala 2.11.8. Getting out the try-catch solves the problem (Spark stops by itself). Also, moving out the try-catch inside foreachRDD
solves the problem.
However I'm looking for an answer that can help me understand what's going on in this particular example.