I have a Spark job that reads in some TB of data and executes two window functions. This job runs just fine in smaller chunks, 50k shuffle partitions on 4TB, but when I increase the data input to 150k-200k shuffle partitions for 15TB nodes begin to fail.
This happens for two reasons:
- OOM on the Executors:
- Timeout when shuffling
OOM on the Executors
20/07/01 15:58:14 ERROR YarnClusterScheduler: Lost executor 92 on ip-10-102-125-133.ec2.internal: Container killed by YARN for exceeding memory limits. 22.0 GB of 22 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead or disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714.
I have already increased the size of the driver to account for the large shuffle:
spark.driver.memory = 16g
spark.driver.maxResultSize = 8g
The executors are R5.xlarge with the following conf:
spark.executor.cores = 4
spark.executor.memory = 18971M
spark.yarn.executor.memoryOverheadFactor = 0.1875
This is well below the max accordiing to AWS: https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-hadoop-task-config.html#emr-hadoop-task-config-r5
yarn.nodemanager.resource.memory-mb = 24576
I understand I need to tweak spark.yarn.executor.memoryOverheadFactor
here to allow space for the massive overhead associated with this many partitions. Hopefully that will be the last change needed there.
Shuffle Timeout
20/07/01 15:59:39 ERROR TransportChannelHandler: Connection to ip-10-102-116-184.ec2.internal/10.102.116.184:7337 has been quiet for 600000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong.
20/07/01 15:59:39 ERROR TransportResponseHandler: Still have 8 requests outstanding when connection from ip-10-102-116-184.ec2.internal/10.102.116.184:7337 is closed
20/07/01 15:59:39 ERROR OneForOneBlockFetcher: Failed while starting block fetches
I've adjusted this timeout as follows:
spark.network.timeout = 600
I can further increase the spark.network.timeout
in the conf to quiet this and wait longer. I would rather reduce the Shuffle Read Blocked Time
, this is ranging from 1min to 30min. Is there a way to increase the communication rate between nodes?
I have tried adjusting the following settings, but can't seem to improve this speed:
spark.reducer.maxSizeInFlight = 512m
spark.shuffle.io.numConnectionsPerPeer = 5
spark.shuffle.io.backLog = 128
What do I need to tune to decrease the Shuffle Read Blocked Time
on AWS EMR?