1

I was looking at consolidate location where I can look what all parameters at a high level that needs to be tuned in Spark job to get better performance out from the cluster assuming you have allocated sufficient nodes. I did go through the link but it's too much to process in one go https://spark.apache.org/docs/2.4.5/configuration.html#available-properties

I have listed my findings below that will help people to look at first before deep diving into the above link with what is use case

mck
  • 40,932
  • 13
  • 35
  • 50
Sachin
  • 3,424
  • 3
  • 21
  • 42

1 Answers1

1

Below is a list of parameters which I found helpful in tuning of the job, I will keep appending this with whenever I found out use case for a parameter

Parameter What to look for
spark.scheduler.mode FAIR or FIFO, This decides how you want to allocate executors to jobs
executor-memory Check OOM in executors if you find they are going OOM probably this is the reason or check for executor-cores values, wheather they are too small causing the load on executors
https://spoddutur.github.io/spark-notes/distribution_of_executors_cores_and_memory_for_spark_application.html
driver-memory If you are doing a collect kind of operation (i.e. any operation that sends data back to Driver) then look for tuning this value
executor-cores Its value really depends on what kind of processing you are looking for is it a multi-threaded approach/ light process. The below doc can help you to understand it better
https://spoddutur.github.io/spark-notes/distribution_of_executors_cores_and_memory_for_spark_application.html
spark.default.parallelism This helped us a quite bit in reducing job execution time, initially run the job without this value & observe what value is default set by the cluster (it does base on cluster size). If you see the value too high then try to reduce it, We generally reduced it to the below logic

number of Max core nodes * number of threads per machine + number of Max on-demand nodes * number of threads per machine + number of Max spot nodes * number of threads per machine
spark.sql.shuffle.partitions This value is used when your job is doing quite shuffling of data e.g. DF with cross joins or inner join when it's not repartitioned on joins clause
dynamic executor allocation This helped us quite a bit from the pain of allocating the exact number of the executors to the job. Try to tune below
spark.dynamicAllocation.minExecutors To start your application these numbers of executors are needed else it will not start. This is quite helpful when you don't want to make your job crawl on 1 or 2 available executors
spark.dynamicAllocation.maxExecutors Max amount of executors can be used to ensure the job does not end up consuming all cluster resources in case its multi-job cluster running parallel jobs
spark.dynamicAllocation.initialExecutors This is quite helpful when the driver is doing some initial job before spawning the jobs to executors e.g. listing the files in a folder so it will delete only those files at end of the job. This ensures you can provide min executors but can get a head start with fact know that driver is going to take some time to start
spark.dynamicAllocation.executorIdleTimeout This is also helpful in the above-mentioned case where the driver is doing some work & has nothing to assign to the executors & you don't want them to time out causing reallocation of executors which will take some time
https://spark.apache.org/docs/2.4.5/configuration.html#dynamic-allocation
Trying to reduce the number of files created while writing the partitions As our data is read by different executors while writing each executor will write its own file. This will end up in creating a large number of small files & in intern the query on those will be heavy. There are 2 ways to do it
Coalesce: This will try to do minimum shuffle across the executors & will create an un-even file size
repartition: This will do a shuffle of data & creates files with ~ equal size
https://stackoverflow.com/questions/31610971/spark-repartition-vs-

coalescemaxRecordsPerFile: This parameter is helpful in informing spark, how many records per file you are looking for
When you are joining small DF with large DF Check if you can use broadcasting of the small DF by default Spark use the sort-merge join, but if your table is quite low in size see if you can broadcast those variables
https://towardsdatascience.com/the-art-of-joining-in-spark-dcbd33d693c
How one can hint spark to use broadcasting: https://stackoverflow.com/a/37487575/814074
Below parameters you need to look for doing broadcast joins are
spark.sql.autoBroadcastJoinThreshold This helps spark to understand for a given size of DF whether to used broadcast join or not
spark.driver.maxResultSize Max result will be returned to the driver so it can broadcast them
driver-memory As the driver is doing broadcasting of result this needs to be bigger
spark.network.timeout spark.executor.heartbeatInterval This helps in the case where you see an abrupt termination of executors from drivers, there could be multiple reasons but if nothing is specifically found you can check on these parameters
https://spark.apache.org/docs/2.4.5/configuration.html#execution-behavior
Data is Skewed across customers Key salting is a way along with try to find out a way where you can trigger the jobs for descending order of volume per customer. This ensures you that cluster will be well occupied during the initial run & long-running customer gets some time while small customers are completing their job. Also, you can drop the customer if no data is present for a given customer to reduce the load on the cluster
Sachin
  • 3,424
  • 3
  • 21
  • 42