5

Consider the following code:

case class Person(
  personId: Long, name: String, ageGroup: String, gender: String,
  relationshipStatus: String, country: String, state: String
)

case class PerPersonPower(personId: Long, power: Double)

val people: Dataset[Person] = ...          // Around 50 million entries.
val powers: Dataset[PerPersonPower] = ...  // Around 50 million entries.

people.join(powers, "personId")
  .groupBy("ageGroup", "gender", "relationshipStatus", "country", "state")
  .agg(
    sum("power").alias("totalPower"),
    count("*").alias("personCount")
  )

It is executed on a cluster with approximately 100 GB of RAM. However, the cluster runs out of memory. I am not sure what to do. In reality, people is partitioned by $"personId" and cached -- people.repartition($"personId").cache().

Any ideas how I might optimize this computation?

The cluster is a vanilla Google Dataproc cluster --- so it uses YARN in client mode --- consisting of 14 nodes with 8 GB RAM each.

Igor Dvorzhak
  • 4,360
  • 3
  • 17
  • 31
d125q
  • 1,666
  • 12
  • 18
  • Please share error logs from your run. – morfious902002 Apr 20 '18 at 15:50
  • How big is the dataset you're trying to cache? 14*8GB=112GB of total RAM, 89GB of which goes to YARN. Of that, only some portion can be used for caching -- let's just say 30% -- so you could cache up to 30GB. https://spark.apache.org/docs/latest/tuning.html#memory-management-overview – Karthik Palaniappan Apr 26 '18 at 20:00
  • @morfious902002 ­– I will get back with the error logs ASAP. Sorry for being absent! – d125q Apr 30 '18 at 07:25

1 Answers1

3

From limited information available in the request, i can suggest not to use cache and create a bit more partitions, than default number (it's usually 200, but may differ from cluster to cluster) - try setting spark.shuffle.partitions within your app to 1000 or 2000 to start with. It may be done as something like spark.conf.set('spark.shuffle.partitions', 1000). Most likely your query hits SortMergeJoin and currently executor gets more data that it's heap minus YARN overhead. Please consult your SparkUI for the cluster in order to monitor and optimize your query execution. In the SQL tab you'll see pretty much detailed numbers about how much data is being processed within each and every stage, so you'll identify bottlenecks and fix them faster.

Spark query planner would first sort PerPersonPower and Person by personId in number defined within spark.shuffle.partitions, flush that to HDFS into spark.shuffle.partitions separate parquet files, then create the same number of partial aggregates and get those into your resulting dataframe.

It seems that you are joining around 18-20GB (people) of data with about 800MB (power). If power would be a bit smaller, you could try to utilize BroadcastHashJoin like people.join(broadcast(powers), "personId"), though i won't recommend broadcasting dataframes larger than 128Mb or 256Mb.

Good luck!

nefo_x
  • 3,050
  • 4
  • 27
  • 40
  • Thank you. I will try increasing the number of partitions and see what I get. I will additionally post the logging information. – d125q Apr 30 '18 at 07:27
  • @d125q did increasing of shuffle partitions help your specific case? just in case, logs won't be as much helpful for you as realtime Spark UI (for which you need to setup a SSH tunnel in dataproc) – nefo_x May 01 '18 at 22:09
  • Not really. I still get lots of `ExecutorLostFailure` warnings, stating `Reason: Container killed by YARN for exceeding memory limits. 12.1 GB of 12 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.`. I will look into monitoring Spark's web UI. – d125q May 08 '18 at 12:19
  • @d125q then i think that you need to tripple amount of shuffle partitions you're using, as amount of data on the same chunk still doesn't fit java heap space. `spark.conf.set('spark.shuffle.partitions', 3000)`. what might be helpful to know more about the query is the execution plan. can you add `.explain()` to end of your dataframe call and update the question with it? – nefo_x May 08 '18 at 20:26
  • 1
    Thank you very much! After some extensive testing, careful partitioning did indeed do that trick! – d125q May 17 '18 at 13:06