23

What is the formula that Spark uses to calculate the number of reduce tasks?

I am running a couple of spark-sql queries and the number of reduce tasks always is 200. The number of map tasks for these queries is 154. I am on Spark 1.4.1.

Is this related to spark.shuffle.sort.bypassMergeThreshold, which defaults to 200

Uli Bethke
  • 782
  • 1
  • 7
  • 17

4 Answers4

34

It's spark.sql.shuffle.partitions that you're after. According to the Spark SQL performance tuning guide:

| Property Name                 | Default | Meaning                                        |
+-------------------------------+---------+------------------------------------------------+
| spark.sql.shuffle.partitions  | 200     | Configures the number of partitions to use     |
|                               |         | when shuffling data for joins or aggregations. |

Another option that is related is spark.default.parallelism, which determines the 'default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set by user', however this seems to be ignored by Spark SQL and only relevant when working on plain RDDs.

sgvd
  • 3,819
  • 18
  • 31
  • spark.default.parallelism is giving maximum partition if data is that large to handle otherwise transformations like join,reduceByKey etc can generate couple of partition as output. – Bhavesh Gadoya Jun 07 '19 at 08:34
4

Yes, @svgd, that is the correct parameter. Here is how you reset it in Scala:

// Set number of shuffle partitions to 3
sqlContext.setConf("spark.sql.shuffle.partitions", "3")
// Verify the setting 
sqlContext.getConf("spark.sql.shuffle.partitions")
pmhargis
  • 713
  • 6
  • 7
4

Nowadays in Spark 2 + to set this parameter do the following

spark.conf.set("spark.sql.shuffle.partitions", 16)
Jack
  • 645
  • 7
  • 14
0

Specifying the min and max split size through mapreduce.input.fileinputformat.split should help. These parameters determine the respective minimum and maximum chunk sizes for splitting the input files to.

val spark = SparkSession.builder
    .config("mapreduce.input.fileinputformat.split.minsize", "1073741824")
    .config("mapreduce.input.fileinputformat.split.maxsize", "1073741824")           
    .enableHiveSupport().getOrCreate()

Here, the split size has been kept 1GB (1073741824 bytes). To remember that parquet, snappy are splittable and gzip, lzo aren't. Refer more here.

Rounak Datta
  • 442
  • 7
  • 10