40

I am using Spark SQL actually hiveContext.sql() which uses group by queries and I am running into OOM issues. So thinking of increasing value of spark.sql.shuffle.partitions from 200 default to 1000 but it is not helping.

I believe this partition will share data shuffle load so more the partitions less data to hold. I am new to Spark. I am using Spark 1.4.0 and I have around 1TB of uncompressed data to process using hiveContext.sql() group by queries.

halfer
  • 19,824
  • 17
  • 99
  • 186
Umesh K
  • 13,436
  • 25
  • 87
  • 129

4 Answers4

57

If you're running out of memory on the shuffle, try setting spark.sql.shuffle.partitions to 2001.

Spark uses a different data structure for shuffle book-keeping when the number of partitions is greater than 2000:

private[spark] object MapStatus {

  def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
    if (uncompressedSizes.length > 2000) {
      HighlyCompressedMapStatus(loc, uncompressedSizes)
    } else {
      new CompressedMapStatus(loc, uncompressedSizes)
    }
  }
...

I really wish they would let you configure this independently.

By the way, I found this information in a Cloudera slide deck.

Nick Chammas
  • 11,843
  • 8
  • 56
  • 115
nont
  • 9,322
  • 7
  • 62
  • 82
  • 3
    This information was accurate before Spark 2.4.x Have a look at spark.shuffle.minNumPartitionsToHighlyCompress In a gist, you can use any number of partitions to use HighlyCompressedMapStatus, which let's you choose the optimum level of parallelism without compromise – Ashvjit Singh Dec 11 '20 at 12:23
9

OK so I think your issue is more general. It's not specific to Spark SQL, it's a general problem with Spark where it ignores the number of partitions you tell it when the files are few. Spark seems to have the same number of partitions as the number of files on HDFS, unless you call repartition. So calling repartition ought to work, but has the caveat of causing a shuffle somewhat unnecessarily.

I raised this question a while ago and have still yet to get a good answer :(

Spark: increase number of partitions without causing a shuffle?

Community
  • 1
  • 1
samthebest
  • 30,803
  • 25
  • 102
  • 142
  • Is there a reference to support the claim that "Spark seems to have the same number of partitions as the number of files on HDFS"? – morpheus May 19 '17 at 03:47
  • @morpheus Running Spark jobs on HDFS for about 3 years :P But yes I'd like to see a reference too, it's just one of those things I've come to treat as fact without ever seeing any explicit documentation. – samthebest May 19 '17 at 12:09
  • According to [documentation](https://spark.apache.org/docs/latest/sql-programming-guide.html) there is a property `spark.sql.files.maxPartitionBytes` which would dictate the # of partitions when loading data from HDFS – morpheus May 19 '17 at 16:14
  • @morpheus, near the bottom of article https://0x0fff.com/spark-architecture/ is some discussion about how partitioning works when reading in files. – Mark Rajcok Jul 19 '17 at 17:56
  • (making this note to help others who come across this thread): the # of partitions when loading data from HDFS is NOT governed by `spark.sql.files.maxPartitionBytes`. Instead it is dictated by the HDFS block size and can be increased by increasing `mapreduce.job.maps` – morpheus Jul 20 '17 at 19:45
4

It's actually depends on your data and your query, if Spark must load 1Tb, there is something wrong on your design.

Use the superbe web UI to see the DAG, mean how Spark is translating your SQL query to jobs/stages and tasks.

Useful metrics are "Input" and "Shuffle".

  • Partition your data (Hive / directory layout like /year=X/month=X)
  • Use spark CLUSTER BY feature, to work per data partition
  • Use ORC / Parquet file format because they provide "Push-down filter", useless data is not loaded to Spark
  • Analyze Spark History to see how Spark is reading data

Also, OOM could happen on your driver?

-> this is another issue, the driver will collect at the end the data you want. If you ask too much data, the driver will OOM, try limiting your query, or write another table (Spark syntax CREATE TABLE ...AS).

Thomas Decaux
  • 21,738
  • 2
  • 113
  • 124
0

I came across this post from Cloudera about Hive Partitioning. Check out the "Pointers" section talking about number of partitions and number of files in each partition resulting in overloading the name node, which might cause OOM.

pbahr
  • 1,300
  • 12
  • 14