21

Sometimes Spark "optimizes" a dataframe plan in an inefficient way. Consider the following example in Spark 2.1 (can also be reproduced in Spark 1.6):

val df = sparkContext.parallelize((1 to 500).map(i=> scala.util.Random.nextDouble),100).toDF("value")

val expensiveUDF = udf((d:Double) => {Thread.sleep(100);d})

val df_result = df
.withColumn("udfResult",expensiveUDF($"value"))

df_result
.coalesce(1)
.saveAsTable(tablename)

In this example I want to write 1 file after an expensive transformation of a dataframe (this is just an example to demonstrate the issue). Spark moves the coalesce(1) up such that the UDF is only applied to a dataframe containing 1 partition, thus destroying parallelism (interestingly repartition(1) does not behave this way).

To generalize, this behavior occurs when I want to increase parallelism in a certain part of my transformation, but decrease parallelism thereafter.

I've found one workaround which consists of caching the dataframe and then triggering the complete evaluation of the dataframe:

val df = sparkContext.parallelize((1 to 500).map(i=> scala.util.Random.nextDouble),100).toDF("value")

val expensiveUDF = udf((d:Double) => {Thread.sleep(100);d})

val df_result = df
.withColumn("udfResult",expensiveUDF($"value"))
.cache

df_result.rdd.count // trigger computation

df_result
.coalesce(1)
.saveAsTable(tablename)

My question is: is there another way to tell Spark not to decrease parallelism in such cases?

Raphael Roth
  • 26,751
  • 15
  • 88
  • 145
  • In a nutshell, you want to *instantiate* a RDD with 500 partitions, then *instantiate* another one to merge the results into just 1 partition so that you can save it into a single file -- cf. https://stackoverflow.com/questions/31383904/how-can-i-force-spark-to-execute-code >> wild guess: maybe a simple call to `getNumPartitions()` would be sufficient to force instantiation, without having to actually scan the result with `count()`... – Samson Scharfrichter Jun 12 '17 at 09:55
  • @SamsonScharfrichter no calling `getNumPartitions()` is not sufficient and does not prevent the coalesce to be "pushed up" – Raphael Roth Jun 12 '17 at 15:06
  • 1
    Coincidence: I just stumbled on that presentation, from the recent Spark Summit > https://www.slideshare.net/databricks/why-you-should-care-about-data-layout-in-the-file-system-with-cheng-lian-and-vida-ha/40 – Samson Scharfrichter Jun 12 '17 at 16:30
  • This seems a very contrived example, is there a real case where you've experienced this? How about using something like reduceByKey after the UDF execution to break it up. – Davos Aug 22 '19 at 17:38

1 Answers1

19

Actually it is not because of SparkSQL's optimization, SparkSQL doesn't change the position of Coalesce operator, as the executed plan shows:

Coalesce 1
+- *Project [value#2, UDF(value#2) AS udfResult#11]
   +- *SerializeFromObject [input[0, double, false] AS value#2]
      +- Scan ExternalRDDScan[obj#1]

I quote a paragraph from coalesce API's description:

Note: This paragraph is added by the jira SPARK-19399. So it should not be found in 2.0's API.

However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can call repartition. This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).

The coalesce API doesn't perform a shuffle, but results in a narrow dependency between previous RDD and current RDD. As RDD is lazy evaluation, the computation is actually done with coalesced partitions.

To prevent it, you should use repartition API.

Alex Moore-Niemi
  • 2,913
  • 2
  • 24
  • 22
viirya
  • 644
  • 6
  • 8
  • please could you elaborate the cited paragraph @viirya? – enneppi Feb 28 '19 at 18:39
  • Is this paragraph suggesting that the "computation taking place on fewer nodes" is the UDF execution? It doesn't appear so from the execution plan, unless you are suggesting that the coalesce reduces the number of upstream nodes, which seems illogical. – Davos Aug 22 '19 at 17:31
  • 2
    Thank you. This is REALLY helpful. All I knew was "`coalesce()` is more efficient for reducing partitions and `repartition()` is faster for increasing them." Now I actually understand the consequences - pushing repartitions up in my script has given me headaches in the past! – rjurney Nov 01 '21 at 18:02
  • Honest question: is there well and truly a valid reason to have both coalesce and repartition with modern spark (3.0+)? Given the end result is intended to be the same, it makes sense to me that spark should make the decision under the hood which operation to call rather than leaving largely misunderstood footguns laying around. – Brendan Feb 03 '22 at 02:07
  • 1
    Do I understand it correctly: even when the execution plan told that `coalesce(1)` was performed last, it was actually performed BEFORE the transformation which added a column? – ZygD Feb 17 '22 at 08:58