2

I have code like following:

// make a rd according to an id
def makeRDD(id:Int, data:RDD[(VertexId, Double)]):RDD[(Long, Double)] = { ... }  
val data:RDD[(VertexId, Double)] = ... // loading from hdfs
val idList = (1 to 100)
val rst1 = idList.map(id => makeRDD(id, data)).reduce(_ union _).reduceByKey(_+_)
val rst2 = idList.map(id => makeRDD(id, data)).reduce((l,r) => (l union r).reduceByKey(_+_))

rst1 and rst2 get the sample result. I thought rst1 require more memory (100 times) but only one reduceByKey tranform; however, rst2 require less memory but more reduceByKey tranforms (99 times). So, is it a game of time and space tradeoff?

My question is: whether my analysis above is right, or Spark treat translate the actions in the same way internally?

P.S.: rst1 union all sub rdd then reduceByKey,which reduceByKey is outside reduce. rst2 reduceByKey one by one, which reduceByKey is inside reduce.

Community
  • 1
  • 1
bourneli
  • 2,172
  • 4
  • 24
  • 40
  • 1
    I'm not sure that I understand your question. rst1 and rst2 have the same code but one uses placeholders for the reduce and the other one doesn't. – eliasah Jun 08 '16 at 06:22
  • rst1 union all sub rdd then reduceByKey,which reduceByKey is **outside** reduce. rst2 reduceByKey one by one, which reduceByKey is **inside** reduce. – bourneli Jun 08 '16 at 06:37
  • Oh sorry, I've thought that reduceByKey in rst2 is outside as well. – eliasah Jun 08 '16 at 06:43
  • 1
    you're right about the number of `reduceByKey` as for memory, I personally thought that they will use the same amount but after doing a very simple test it turns out that the second one is not only slower but also uses up more memory. Might be do to the number of shuffles it has to do. – Mateusz Dymczyk Jun 08 '16 at 07:46
  • Actually both are quite inefficient although how much depends on configuration. – zero323 Jun 08 '16 at 08:35

1 Answers1

3

Long story short both solutions are relatively inefficient but the second one is worst than the first.

Let's start by answering the last question. For low level RDD API there are only two types of global automatic optimizations (instead):

  • using explicitly or implicitly cached tasks results instead recomputing complete lineage
  • combining multiple transformations which don't require a shuffle into a single ShuffleMapStage

Everything else is pretty much a sequential transformations which defines DAG. This stays in contrast to more restrictive, high level Dataset (DataFrame) API, which makes specific assumptions about transformations and perform global optimizations of the execution plan.

Regarding your code. The biggest problem with the first solution is a growing lineage when you apply iterative union. It makes some things, like failure recovery expensive, and since RDDs are defined recursively, can fail with StackOverflow exception. A less serious side effect is a growing number of partitions which is doesn't seem to be compensated in the subsequent reduction*. You'll find a more detailed explanation in my answer to Stackoverflow due to long RDD Lineage but what you really need here is a single union like this:

sc.union(idList.map(id => makeRDD(id, data))).reduceByKey(_+_)

This is actually an optimal solution assuming you apply truly reducing function.

The second solution obviously suffers from the same problem, nevertheless it gets worse. While the first approach requires only two stages with a single shuffle, this requires a shuffle for each RDD. Since number of partitions is growing and you use default HashPartitioner each piece of data has to be written to disk multiple times and most likely shuffled over the network multiple times. Ignoring low level calculations each record is shuffled O(N) times where N is a number of RDDs you merge.

Regarding memory usage it is not obvious without knowing more about data distribution but in the worst case scenario the second method can express significantly worse behavior.

If + works with constant space the only requirement for reduction is a hashmap to store the results of map side combine. Since partitions are processed as a stream of data without reading complete content into memory, this means that total memory size for each task will be proportional to the number of unique keys and not the amount of data. Since the second method requires more tasks overall memory usage will be higher than the first case. On average it can be slightly better due to data being partially organized but it is rather unlikely to compensate additional costs.


* If you want to learn how it can affect overall performance you can see Spark iteration time increasing exponentially when using join This is slightly different problem but should give you some idea why controlling number of partitions matters.

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • 2
    sc.union does not improve performece obviously. However, inspired by the general optimazation strategies: reduce lineage and shullfe, I have stored the middle result, used **persist** and **checkpoint** carefully, and remove the last reduceByKey. The performance has improved greatly. Thanks @zero323. And also, thanks for eliasah, for your examples. – bourneli Jun 16 '16 at 12:42
  • @eliasah could you share those examples ? – Knight71 Jul 21 '20 at 10:45
  • I am trying to load small data set and then do reduceByKey and then keep the reducedRdd. And then repeat load/reduce with persisted RDD. – Knight71 Jul 21 '20 at 10:47