3

I know that some of Spark Actions like collect() cause performance issues.

It has been quoted in documentation

To print all elements on the driver, one can use the collect() method to first bring the RDD to the driver node thus:rdd.collect().foreach(println). This can cause the driver to run out of memory, though,

because collect() fetches the entire RDD to a single machine; if you only need to print a few elements of the RDD, a safer approach is to use the take(): rdd.take(100).foreach(println).

And from one more related SE question: Spark runs out of memory when grouping by key

I have come to know that groupByKey(), reduceByKey() may cause out of memory if parallelism is not set properly.

I did not get enough evidence on other Transformations and Action commands, which have to be used with caution.

These three are the only commands to be tackled? I have doubts about below commands too

  1. aggregateByKey()
  2. sortByKey()
  3. persist() / cache()

It would be great if you provide information on intensive commands (global across partitions instead of single partition OR low performance commands), which have to be tackled with better guarding.

Community
  • 1
  • 1
Ravindra babu
  • 37,698
  • 11
  • 250
  • 211

1 Answers1

6

You have to consider three types of operations:

  • transformations implemented using only mapPartitions(WithIndex) like filter, map, flatMap etc. Typically it will be the safest group. Probably the biggest possible issue you can encounter is an extensive spilling to disk.
  • transformations which require shuffle. It includes obvious suspects like different variants of combineByKey (groupByKey, reduceByKey, aggregateByKey) or join and less obvious like sortBy, distinct or repartition. Without a context (data distribution, exact function for reduction, partitioner, resources) it is hard to tell if particular transformation will be problematic. There are two main factors:
    • network traffic and disk IO - any operation which is not performed in memory will be at least an order of magnitude slower.
    • skewed data distribution - if distribution is highly skewed shuffle can fail or subsequent operations may suffer from a suboptimal resource allocation
  • operations which require passing data to and from the driver. Typically it covers actions like collect or take and creating distributed data structure from a local one (parallelize).

    Other members of this category are broadcasts (including automatic broadcast joins) and accumulators. Total cost depends of course on a particular operation and the amount of data.

While some of these operations can be expensive none is particularly bad (including demonized groupByKey) by itself. Obviously it is better to avoid network traffic or additional disk IO but in practice you cannot avoid it in any complex application.

Regarding cache you may find Spark: Why do i have to explicitly tell what to cache? useful.

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Thanks for the info. What is impact of cache() or persist()/unpersist() on RDD? – Ravindra babu Jan 15 '16 at 06:37
  • 1
    Every RDD is deleted as soon as it is not needed anymore. This mechanism is not like in Java that is kept as long as you have reference, cause the real computations are triggered only by some functions. So, it you whant to reuse some RDD you can do it by assigning it to some variable, caching it, then make more transformations, call `collect` for example. Then, the RDD you've created will still be cached in form after the transformation before which you called `cache()`. – szefuf Jan 15 '16 at 09:12
  • zero323, on a different note, can you check this question : http://stackoverflow.com/questions/35146482/spark-scala-transformations-immutability-memory-overheads – Ravindra babu Feb 04 '16 at 10:30
  • @ravindra Noted. Existing answers look OK, but I'll try to add some remarks when I have some spare time. – zero323 Feb 04 '16 at 23:11