1

I have two large datasets, A and B, which I wish to join on key K.

Each dataset contains many rows with the same value of K, so this is a many-to-many join.

This join fails with memory related errors if I just try it naively.

Let's also say grouping both datasets by K, doing the join and then exploding back out with some trickery to get the correct result isn't a viable option, again due to memory issues

Are there any clever tricks people have found which improves the chance of this working?


Update:

Adding a very, very contrived concrete example:

spark-shell --master local[4] --driver-memory 5G --conf spark.sql.autoBroadcastJoinThreshold=-1 --conf spark.sql.autoBroadcastJoinThreshold=-1 --conf spark.sql.shuffle.partitions=10000 --conf spark.default.parallelism=10000

val numbersA = (1 to 100000).toList.toDS
val numbersWithDataA = numbersA.repartition(10000).map(n => (n, 1, Array.fill[Byte](1000*1000)(0)))
numbersWithDataA.write.mode("overwrite").parquet("numbersWithDataA.parquet")

val numbersB = (1 to 100).toList.toDS
val numbersWithDataB = numbersB.repartition(100).map(n => (n, 1, Array.fill[Byte](1000*1000)(0)))
numbersWithDataB.write.mode("overwrite").parquet("numbersWithDataB.parquet")


val numbersWithDataInA = spark.read.parquet("numbersWithDataA.parquet").toDF("numberA", "one", "dataA")
val numbersWithDataInB = spark.read.parquet("numbersWithDataB.parquet").toDF("numberB", "one", "dataB")

numbersWithDataInA.join(numbersWithDataInB, Seq("one")).write.mode("overwrite").parquet("joined.parquet")

Fails with Caused by: java.lang.OutOfMemoryError: Java heap space

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
user2682459
  • 999
  • 2
  • 13
  • 24

1 Answers1

1
--conf spark.sql.autoBroadcastJoinThreshold=-1

means you are disabling the broadcast feature.

You can change it to any suitable value <2gb (since 2gb limit is there). spark.sql.autoBroadcastJoinThreshold is default 10mb as per spark documentation. I dont know the reason you have disabled it. if you disbale it SparkStregies will switch the path to sortmerge join or shuffle hash join. see my article for details

Remaining I dont think there is any need to change as its common pattern of joining 2 datasets.

Further reading DataFrame join optimization - Broadcast Hash Join

UPDATE : Alternatively In your real example (not contrieved :-)) you can do these steps

Steps :

1) Each dataset find out join key (may be for example, pickup a unique/distinct category or country or state field) and collect them as an array since its small data you can collect.

2) For each category element in an array join the 2 datasets (playing with small dataset joins) with category as where condition add to a sequence of dataframes.

3) reduce and union these dataframes. scala example :

val dfCatgories = Seq(df1Category1, df2Category2, df3Category3)
dfCatgories.reduce(_ union _)

Note : for each join I still prefer BHJ since it will be less/no shuffle

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
  • This is why I didn't initially post an example, because I knew as soon as I did people would try to solve the example. My actual data is on the scale of 100's GB in each table so broadcasting simply isn't an option. – user2682459 Mar 19 '20 at 21:45
  • ok. have you tried increasing executor memory and number of executors? . if its SMJ or SHJ it needs more memory. also caching the data might also help in some cases. – Ram Ghadiyaram Mar 19 '20 at 21:48
  • Hi can you just post join - explain plan of the real one. for performance tuning this is vital – Ram Ghadiyaram Mar 19 '20 at 23:21
  • Splitting into many separate joins does make sense. Wish spark handled this sort of things natively! – user2682459 Mar 20 '20 at 17:21
  • yes pls try once to avoid OOM issue also i hope you have good memory in the executor/driver and passed conf through spark-submit – Ram Ghadiyaram Mar 20 '20 at 17:55