12

I'm getting the same error than Missing an output location for shuffle when joining big dataframes in Spark SQL. The recommendation there is to set MEMORY_AND_DISK and/or spark.shuffle.memoryFraction 0. However, spark.shuffle.memoryFraction is deprecated in Spark >= 1.6.0 and setting MEMORY_AND_DISK shouldn't help if I'm not caching any RDD or Dataframe, right? Also I'm getting lots of other WARN logs and task retries that lead me to think that the job is not stable.

Therefore, my question is:

  • What are best practices to join huge dataframes in Spark SQL >= 1.6.0?

More specific questions are:

  • How to tune number of executors and spark.sql.shuffle.partitions to achieve better stability/performance?
  • How to find the right balance between level of parallelism (num of executors/cores) and number of partitions? I've found that increasing the num of executors is not always the solution as it may generate I/O reading time out exceptions because of network traffic.
  • Is there any other relevant parameter to be tuned for this purpose?
  • My understanding is that joining data stored as ORC or Parquet offers better performance than text or Avro for join operations. Is there a significant difference between Parquet and ORC?
  • Is there an advantage of SQLContext vs HiveContext regarding stability/performance for join operations?
  • Is there a difference regarding performance/stability when the dataframes involved in the join are previously registerTempTable() or saveAsTable()?

So far I'm using this is answer and this chapter as a starting point. And there are a few more stackoverflow pages related to this subject. Yet I haven't found a comprehensive answer to this popular issue.

Thanks in advance.

ZygD
  • 22,092
  • 39
  • 79
  • 102
leo9r
  • 2,037
  • 26
  • 30
  • 1
    This answer is recommending to set spark.sql.shuffle.partitions above 2000 when there are shuffling memory issues, as Spark uses a different data structure for shuffle book-keeping when the number of partitions is greater than that threshold: http://stackoverflow.com/a/36459198/2482894 – leo9r Jun 23 '16 at 21:50
  • 1
    Setting spark.yarn.executor.memoryOverhead=1024 is recommended in this answer: http://stackoverflow.com/a/33118489/2482894 – leo9r Jun 23 '16 at 21:52

1 Answers1

7

That are a lot of questions. Allow me to answer these one by one:

Your number of executors is most of the time variable in a production environment. This depends on the available resources. The number of partitions is important when you are performing shuffles. Assuming that your data is now skewed, you can lower the load per task by increasing the number of partitions. A task should ideally take a couple of minus. If the task takes too long, it is possible that your container gets pre-empted and the work is lost. If the task takes only a few milliseconds, the overhead of starting the task gets dominant.

The level of parallelism and tuning your executor sizes, I would like to refer to the excellent guide by Cloudera: https://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

ORC and Parquet only encode the data at rest. When doing the actual join, the data is in the in-memory format of Spark. Parquet is getting more popular since Netflix and Facebook adopted it and put a lot of effort in it. Parquet allows you to store the data more efficient and has some optimisations (predicate pushdown) that Spark uses.

You should use the SQLContext instead of the HiveContext, since the HiveContext is deprecated. The SQLContext is more general and doesn't only work with Hive.

When performing the registerTempTable, the data is stored within the SparkSession. This doesn't affect the execution of the join. What it stores is only the execution plan which gets invoked when an action is performed (for example saveAsTable). When performining a saveAsTable the data gets stored on the distributed file system.

Hope this helps. I would also suggest watching our talk at the Spark Summit about doing joins: https://www.youtube.com/watch?v=6zg7NTw-kTQ. This might provide you some insights.

Cheers, Fokko

Fokko Driesprong
  • 2,075
  • 19
  • 31
  • I have been looking for you ever since I saw your presentation on Iterative Broadcast join. That was really nice. I was able to perform sort merge join with two large dataframe in batches and seems working fine. link below https://stackoverflow.com/questions/53524062/efficient-pyspark-join/53720497#53720497 .. – vikrant rana May 13 '19 at 11:07
  • I wanted to use the same technique with iterative broadcast join but was not able to get that how to clear broadcast partition from memory before we broadcast the next batch of small dataframe. Could you please give your valuable inputs for below question. see link below https://stackoverflow.com/questions/53784272/broadcast-hash-join-iterative/56092295#56092295 .. Any help would be highly appreciated. Thanks – vikrant rana May 13 '19 at 11:07
  • any help! I am struggling to code the iterative broadcast hash join – vikrant rana May 16 '19 at 07:59