7

I am currently processing the data using spark and foreach partition open a connection to mysql and insert it to the database in a batch of 1000. As mentioned in the SparkDocumentation default value of spark.sql.shuffle.partitions is 200 but i want to keep it dynamic. So, how do i calculate it. Hence, neither choosing very high value causing performance degradation nor choosing very small value causing OOM.

Naresh
  • 5,073
  • 12
  • 67
  • 124

2 Answers2

-1

Try below option -

val numExecutors         = spark.conf.get("spark.executor.instances").toInt

val numExecutorsCores    = spark.conf.get("spark.executor.cores").toInt

val numShufflePartitions = (numExecutors * numExecutorsCores)

spark.conf.set("spark.sql.shuffle.partitions", numShufflePartitions)

This will help you set the right number of shuffle partitions based on executor and executors cores used for your spark job without compromising performance and leading to Out Of Memory issues.

If you still get out of memeory them set below property -

spark.conf.set("spark.executor.memoryOverhead", "3G")

Other option is to calculate Dataframe size and didvie that by hdfs block size and use the resultant number to set spark.sql.shuffle.partitions.

Horai Nuri
  • 5,358
  • 16
  • 75
  • 127
Ajay Ahuja
  • 1,196
  • 11
  • 26
-4

You can use df.repartition(numPartitions) method for doing this. You can take decision based on the input/intermediate output and pass numPartitions to repartition() method.

df.repartition(numPartitions)   or rdd.repartition(numPartitions)