0

Context I am processing some data (5 billion rows, ~7 columns) via pyspark on EMR.

The first steps including some joins work as expected, up to and including a cache() (memory_disk_ser). Then I filter one column for nulls and do a count() of this big dataframe.

Problem

It takes hours to then fail with a 'no connection error' (I do not remember precisely, but I am more interested in the 'why' of it being slow than the final error).

What I noticed

Out of my 256 vcores, 1 is always at 100%, the rest is idle. The one at 100% is used by a data node JVM.

Configuration

I have 4 r5a.16xlarge instances, each with 4 EBS ssds.

EMR is supposed to take care of its own config, and that is what I see in the spark UI:

  • spark.emr.default.executor.memory 18971M
  • spark.driver.memory 2048M
  • spark.executor.cores 4

I am setting myself:

  • spark.network.timeout: 800s
  • spark.executor.heartbeatInterval: 60s
  • spark.dynamicAllocation.enabled: True
  • spark.dynamicAllocation.shuffleTracking.enabled: True
  • spark.executor.instances: 0
  • spark.default.parallelism: 128
  • spark.shuffle.spill.compress: true
  • spark.shuffle.compress: true
  • spark.rdd.compress: true
  • spark.storage.level: MEMORY_AND_DISK_SER
  • spark.executor.extraJavaOptions: -X:+seG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p' -Duser.timezone=GMT
  • spark.driver.extraJavaOptions: -XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p' -Duser.timezone=GMT

Question

What do I do wrong, or what do I not understand properly? Counting a cached dataframe built in 10 minutes, even when filtering out nulls, should not take hours.

Some more details The data source is on S3, on homogeneous parquet files. But reading those always work fine, because the join succeeds. During the count(), I see 200 taaks, 195 succeeds within a few seconds, but 5 consistently never complete, all NODE_LOCAL (but some NODE_LOCAL) tasks do complete

Guillaume
  • 2,325
  • 2
  • 22
  • 40

1 Answers1

0

Its a bit hard to tell what is going wrong without looking at the resource manager.

But first, make sure that you are not 'measuring' the success on non-action APIs (because cache is not an action, neither joins and so on).

My best shot would be at the Spark configurations, I would be using cluster-mode with these configurations:

spark.default.parallelism=510

--num-executors=13 (or spark.executor.instances)

spark.executor.cores = spark.driver.cores = 5

spark.executor.memory = spark.driver.memory = 36g

spark.driver.memoryOverhead=4g

I think the problem is at this configuration spark.executor.instances which you have set to 0.

Otherwise, these settings (from AWS official guide) are almost optimal for your used AWS instance types.

  • Thanks! I'm a bit worried about setting myself memory and cores - isn't EMR supposed to find out by itself, based on the instance type? That was my goal - set up a dynamic cluster, which can be used for different workloads via automagic configuration. – Guillaume Jul 18 '22 at 06:43
  • The reason for spark.executor.instances = 0 stems from https://stackoverflow.com/a/34000524/2473382 , and yes indeed, I am running in cluster-mode. – Guillaume Jul 18 '22 at 06:46