7

I have gone through some videos in Youtube regarding Spark architecture.

Even though Lazy evaluation, Resilience of data creation in case of failures, good functional programming concepts are reasons for success of Resilenace Distributed Datasets, one worrying factor is memory overhead due to multiple transformations resulting into memory overheads due data immutability.

If I understand the concept correctly, Every transformations is creating new data sets and hence the memory requirements will gone by those many times. If I use 10 transformations in my code, 10 sets of data sets will be created and my memory consumption will increase by 10 folds.

e.g.

val textFile = sc.textFile("hdfs://...")
val counts = textFile.flatMap(line => line.split(" "))
                 .map(word => (word, 1))
                 .reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")

Above example has three transformations : flatMap, map and reduceByKey. Does it implies I need 3X memory of data for X size of data?

Is my understanding correct? Is caching RDD is only solution to address this issue?

Once I start caching, it may spill over to disk due to large size and performance would be impacted due to disk IO operations. In that case, performance of Hadoop and Spark are comparable?

EDIT:

From the answer and comments, I have understood lazy initialization and pipeline process. My assumption of 3 X memory where X is initial RDD size is not accurate.

But is it possible to cache 1 X RDD in memory and update it over the pipleline? How does cache () works?

Ravindra babu
  • 37,698
  • 11
  • 250
  • 211
  • Hi, it's already a time that I've worked with Spark (so I forgot a lot of the details) but you don't need 3x the memory. Since there is the lazy evaluation as you mentioned, there is basically just one operation. Through to internal optimization routines it's basically just "one big" transformation. Ones you start using the disk the performance should go towards the performance of hadoop. – Michael Brenndoerfer Feb 02 '16 at 06:50
  • @Michael why would the perf go to MR? – Justin Pihony Feb 02 '16 at 06:54
  • It's still quicker, but of course once you start using the disk the "tendency is towards Hadoop speed", what basically just means it's getting slower. But of course you are right with your answer, that you anyways have performance benefits through eg. the lazy eval. – Michael Brenndoerfer Feb 02 '16 at 07:00

2 Answers2

12

First off, the lazy execution means that functional composition can occur:

scala> val rdd = sc.makeRDD(List("This is a test", "This is another test", 
                                 "And yet another test"), 1)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[70] at makeRDD at <console>:27

scala> val counts = rdd.flatMap(line => {println(line);line.split(" ")}).
     | map(word => {println(word);(word,1)}).
     | reduceByKey((x,y) => {println(s"$x+$y");x+y}).
     | collect
This is a test
This
is
a
test
This is another test
This
1+1
is
1+1
another
test
1+1
And yet another test
And
yet
another
1+1
test
2+1
counts: Array[(String, Int)] = Array((And,1), (is,2), (another,2), (a,1), (This,2), (yet,1), (test,3))

First note that I force the parallelism down to 1 so that we can see how this looks on a single worker. Then I add a println to each of the transformations so that we can see how the workflow moves. You see that it processes the line, then it processes the output of that line, followed by the reduction. So, there are not separate states stored for each transformation as you suggested. Instead, each piece of data is looped through the entire transformation up until a shuffle is needed, as can be seen by the DAG visualization from the UI:

DAG

That is the win from the laziness. As to Spark v Hadoop, there is already a lot out there (just google it), but the gist is that Spark tends to utilize network bandwidth out of the box, giving it a boost right there. Then, there a number of performance improvements gained by laziness, especially if a schema is known and you can utilize the DataFrames API.

So, overall, Spark beats MR hands down in just about every regard.

Justin Pihony
  • 66,056
  • 18
  • 147
  • 180
  • 1
    Sorry for not understanding one point. flatMap().map().reduceByKey() results into three new RDDs or single RDD or double RDDs? – Ravindra babu Feb 02 '16 at 09:09
  • 1
    Concise and simple way to demonstrate pipelining concept via println on each stage. A lot of people were asking for confirmation that pipelining was working this way, now I know how to demonstrate it easily. Thanks for this! – Alex Larikov Feb 02 '16 at 09:10
  • 1
    @ravindra there would be 3 RDDs, but new RDD doesn't always mean move of the data, as actual data in RDDs is stored within partitions, and child RDDs are parent aware, which means that `map` and `flatMap` would reuse partitions created by `makeRDD`. Contrary `reduceByKey` does require shuffling which is always move of the data - that's why Spark splits DAG into two stages. But transformations within same stage can be optimized by spark, so makeRDD -> flatMap -> map would be run as single transformation on each row and it will discard intermediate results without taking extra memory – Alex Larikov Feb 02 '16 at 09:21
  • Got it. It won't be 3 X of original size but still some more than 1 X size. Immutability has one disadvantage with respect to memory. Is it possible to use same RDD in Cache and update it sequentially over the pipeline and limit the memory requirement to initial 1 X size? – Ravindra babu Feb 02 '16 at 09:25
  • The only way you can update the cache would be to clear it and have the underlying dataset change. The RDD itself is immutable – Justin Pihony Feb 17 '16 at 01:30
3

The memory requirements of Spark not 10 times if you have 10 transformations in your Spark job. When you specify the steps of transformations in a job Spark builds a DAG which will allow it to execute all the steps in the jobs. After that it breaks the job down into stages. A stage is a sequence of transformations which Spark can execute on dataset without shuffling.

When an action is triggered on the RDD, Spark evaluates the DAG. It just applies all the transformations in a stage together until it hits the end of the stage, so it is unlikely for the memory pressure to be 10 time unless each transformation leads to a shuffle (in which case it is probably a badly written job).

I would recommend watching this talk and going through the slides.

Saket
  • 3,079
  • 3
  • 29
  • 48
  • I am clear on 10 X now. Can you answer second question too? – Ravindra babu Feb 02 '16 at 09:48
  • You cannot cache() an RDD and update it after that. It's not like the usual cache which we talk about. It just caches the state at a certain point in the DAG. It just ensures that the previous steps in the graph are not calculated multiple times. – Saket Feb 02 '16 at 13:35
  • The link to the slides has changed. It now leads to main page of the website. – Itération 122442 Nov 03 '20 at 15:41