Context I am processing some data (5 billion rows, ~7 columns) via pyspark on EMR.
The first steps including some joins work as expected, up to and including a cache()
(memory_disk_ser). Then I filter one column for nulls and do a count() of this big dataframe.
Problem
It takes hours to then fail with a 'no connection error' (I do not remember precisely, but I am more interested in the 'why' of it being slow than the final error).
What I noticed
Out of my 256 vcores, 1 is always at 100%, the rest is idle. The one at 100% is used by a data node JVM.
Configuration
I have 4 r5a.16xlarge instances, each with 4 EBS ssds.
EMR is supposed to take care of its own config, and that is what I see in the spark UI:
- spark.emr.default.executor.memory 18971M
- spark.driver.memory 2048M
- spark.executor.cores 4
I am setting myself:
- spark.network.timeout: 800s
- spark.executor.heartbeatInterval: 60s
- spark.dynamicAllocation.enabled: True
- spark.dynamicAllocation.shuffleTracking.enabled: True
- spark.executor.instances: 0
- spark.default.parallelism: 128
- spark.shuffle.spill.compress: true
- spark.shuffle.compress: true
- spark.rdd.compress: true
- spark.storage.level: MEMORY_AND_DISK_SER
- spark.executor.extraJavaOptions: -X:+seG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p' -Duser.timezone=GMT
- spark.driver.extraJavaOptions: -XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p' -Duser.timezone=GMT
Question
What do I do wrong, or what do I not understand properly? Counting a cached dataframe built in 10 minutes, even when filtering out nulls, should not take hours.
Some more details The data source is on S3, on homogeneous parquet files. But reading those always work fine, because the join succeeds. During the count(), I see 200 taaks, 195 succeeds within a few seconds, but 5 consistently never complete, all NODE_LOCAL (but some NODE_LOCAL) tasks do complete