Consider the following code:
case class Person(
personId: Long, name: String, ageGroup: String, gender: String,
relationshipStatus: String, country: String, state: String
)
case class PerPersonPower(personId: Long, power: Double)
val people: Dataset[Person] = ... // Around 50 million entries.
val powers: Dataset[PerPersonPower] = ... // Around 50 million entries.
people.join(powers, "personId")
.groupBy("ageGroup", "gender", "relationshipStatus", "country", "state")
.agg(
sum("power").alias("totalPower"),
count("*").alias("personCount")
)
It is executed on a cluster with approximately 100 GB of RAM. However, the cluster runs out of memory. I am not sure what to do. In reality, people
is partitioned by $"personId"
and cached -- people.repartition($"personId").cache()
.
Any ideas how I might optimize this computation?
The cluster is a vanilla Google Dataproc cluster --- so it uses YARN in client mode --- consisting of 14 nodes with 8 GB RAM each.