spark.scheduler.mode |
FAIR or FIFO, This decides how you want to allocate executors to jobs |
executor-memory |
Check OOM in executors if you find they are going OOM probably this is the reason or check for executor-cores values, wheather they are too small causing the load on executors https://spoddutur.github.io/spark-notes/distribution_of_executors_cores_and_memory_for_spark_application.html |
driver-memory |
If you are doing a collect kind of operation (i.e. any operation that sends data back to Driver) then look for tuning this value |
executor-cores |
Its value really depends on what kind of processing you are looking for is it a multi-threaded approach/ light process. The below doc can help you to understand it better https://spoddutur.github.io/spark-notes/distribution_of_executors_cores_and_memory_for_spark_application.html |
spark.default.parallelism |
This helped us a quite bit in reducing job execution time, initially run the job without this value & observe what value is default set by the cluster (it does base on cluster size). If you see the value too high then try to reduce it, We generally reduced it to the below logic
number of Max core nodes * number of threads per machine + number of Max on-demand nodes * number of threads per machine + number of Max spot nodes * number of threads per machine |
spark.sql.shuffle.partitions |
This value is used when your job is doing quite shuffling of data e.g. DF with cross joins or inner join when it's not repartitioned on joins clause |
dynamic executor allocation |
This helped us quite a bit from the pain of allocating the exact number of the executors to the job. Try to tune below |
spark.dynamicAllocation.minExecutors |
To start your application these numbers of executors are needed else it will not start. This is quite helpful when you don't want to make your job crawl on 1 or 2 available executors |
spark.dynamicAllocation.maxExecutors |
Max amount of executors can be used to ensure the job does not end up consuming all cluster resources in case its multi-job cluster running parallel jobs |
spark.dynamicAllocation.initialExecutors |
This is quite helpful when the driver is doing some initial job before spawning the jobs to executors e.g. listing the files in a folder so it will delete only those files at end of the job. This ensures you can provide min executors but can get a head start with fact know that driver is going to take some time to start |
spark.dynamicAllocation.executorIdleTimeout |
This is also helpful in the above-mentioned case where the driver is doing some work & has nothing to assign to the executors & you don't want them to time out causing reallocation of executors which will take some time https://spark.apache.org/docs/2.4.5/configuration.html#dynamic-allocation |
Trying to reduce the number of files created while writing the partitions |
As our data is read by different executors while writing each executor will write its own file. This will end up in creating a large number of small files & in intern the query on those will be heavy. There are 2 ways to do it Coalesce: This will try to do minimum shuffle across the executors & will create an un-even file size repartition: This will do a shuffle of data & creates files with ~ equal size https://stackoverflow.com/questions/31610971/spark-repartition-vs-
coalescemaxRecordsPerFile: This parameter is helpful in informing spark, how many records per file you are looking for |
When you are joining small DF with large DF |
Check if you can use broadcasting of the small DF by default Spark use the sort-merge join, but if your table is quite low in size see if you can broadcast those variables https://towardsdatascience.com/the-art-of-joining-in-spark-dcbd33d693c How one can hint spark to use broadcasting: https://stackoverflow.com/a/37487575/814074 Below parameters you need to look for doing broadcast joins are |
spark.sql.autoBroadcastJoinThreshold |
This helps spark to understand for a given size of DF whether to used broadcast join or not |
spark.driver.maxResultSize |
Max result will be returned to the driver so it can broadcast them |
driver-memory |
As the driver is doing broadcasting of result this needs to be bigger |
spark.network.timeout spark.executor.heartbeatInterval |
This helps in the case where you see an abrupt termination of executors from drivers, there could be multiple reasons but if nothing is specifically found you can check on these parameters https://spark.apache.org/docs/2.4.5/configuration.html#execution-behavior |
Data is Skewed across customers |
Key salting is a way along with try to find out a way where you can trigger the jobs for descending order of volume per customer. This ensures you that cluster will be well occupied during the initial run & long-running customer gets some time while small customers are completing their job. Also, you can drop the customer if no data is present for a given customer to reduce the load on the cluster |