4

I am trying to find the situations in which Spark would skip stages in case I am using RDDs. I know that it will skip stages if there is a shuffle operation happening. So, I wrote the following code to see if it is true:

def main(args: Array[String]): Unit =
{
  val conf = new SparkConf().setMaster("local").setAppName("demo")
  val sc   = new SparkContext(conf)

  val d = sc.parallelize(0 until 1000000).map(i => (i%100000, i))

  val c=d.rightOuterJoin(d.reduceByKey(_+_)).collect
  val f=d.leftOuterJoin(d.reduceByKey(_+_)).collect
  val g=d.join(d.reduceByKey(_ + _)).collect
}

On inspecting Spark UI, I am getting the following jobs with its stages: enter image description here

I was expecting stage 3 and stage 6 to be skipped as these were using the same RDD to compute the required joins(given the fact that in case of shuffle, spark automatically caches data). Can anyone please explain why am I not seeing any skipped stages here? And how can I modify the code to see the skipped stages? And are there any other situations(apart from shuffling) when Spark is expected to skip stages?

A Beginner
  • 393
  • 2
  • 12
  • 1
    https://stackoverflow.com/questions/34580662/what-does-stage-skipped-mean-in-apache-spark-web-ui – thebluephantom Oct 14 '19 at 15:01
  • 1
    @thebluephantom That question asks for the meaning of 'skipped' stage. My question is different. As I have already mentioned in my question and as the link that you shared has pointed out, Spark caches the data if there is a shuffling. But on writing a simple code where shuffling occurs(join), I am not able to see any skipped stages. – A Beginner Oct 14 '19 at 17:26
  • @thebluephantom Thank you. Let me know if you find the answer :) – A Beginner Oct 14 '19 at 20:14
  • @thebluephantom Thanks! So, i tried your code and compared it with the original one. It seems that Spark automatically skips the stage only when more than 1 action is applied on the same RDD (having shuffle). – A Beginner Oct 15 '19 at 09:46
  • That's the point in my view, I will publish after lunch. Shuffling slightly different to caching. This demonstrates that point: val d = sc.parallelize(0 until 100000).map(i => (i%10000, i)).cache // or not cached, does not matter val c=d.rightOuterJoin(d.reduceByKey(_+_)) val f=d.leftOuterJoin(d.reduceByKey(_+_)) c.count c.collect // skipped, shuffled f.count f.collect // skipped, shuffled – thebluephantom Oct 15 '19 at 09:52
  • I think the answer reflects the question. – thebluephantom Oct 19 '19 at 06:30

1 Answers1

3

Actually, it is very simple.

In your case nothing can be skipped as each Action has a different JOIN type. It needs to scan d and d' to compute the result. Even with .cache (which you do not use and should use to avoid recomputing all the way back to source on each Action), this would make no difference.

Looking at this simplified version:

val d = sc.parallelize(0 until 100000).map(i => (i%10000, i)).cache // or not cached, does not matter

val c=d.rightOuterJoin(d.reduceByKey(_+_))
val f=d.leftOuterJoin(d.reduceByKey(_+_))

c.count
c.collect // skipped, shuffled 
f.count
f.collect // skipped, shuffled

Shows the following Jobs for this App:

(4) Spark Jobs
Job 116 View(Stages: 3/3)
Job 117 View(Stages: 1/1, 2 skipped)
Job 118 View(Stages: 3/3)
Job 119 View(Stages: 1/1, 2 skipped)

You can see that successive Actions based on same shuffling result cause a skipping of one or more Stages for the second Action / Job for val c or val f. That is to say, the join type for c and f are known and the 2 Actions for the same join type run sequentially profiting from prior work, i.e. the second Action can rely on the shuffling of the first Action that is directly applicable to the 2nd Action. That simple.

thebluephantom
  • 16,458
  • 8
  • 40
  • 83