I have two large datasets, A and B, which I wish to join on key K.
Each dataset contains many rows with the same value of K, so this is a many-to-many join.
This join fails with memory related errors if I just try it naively.
Let's also say grouping both datasets by K, doing the join and then exploding back out with some trickery to get the correct result isn't a viable option, again due to memory issues
Are there any clever tricks people have found which improves the chance of this working?
Update:
Adding a very, very contrived concrete example:
spark-shell --master local[4] --driver-memory 5G --conf spark.sql.autoBroadcastJoinThreshold=-1 --conf spark.sql.autoBroadcastJoinThreshold=-1 --conf spark.sql.shuffle.partitions=10000 --conf spark.default.parallelism=10000
val numbersA = (1 to 100000).toList.toDS
val numbersWithDataA = numbersA.repartition(10000).map(n => (n, 1, Array.fill[Byte](1000*1000)(0)))
numbersWithDataA.write.mode("overwrite").parquet("numbersWithDataA.parquet")
val numbersB = (1 to 100).toList.toDS
val numbersWithDataB = numbersB.repartition(100).map(n => (n, 1, Array.fill[Byte](1000*1000)(0)))
numbersWithDataB.write.mode("overwrite").parquet("numbersWithDataB.parquet")
val numbersWithDataInA = spark.read.parquet("numbersWithDataA.parquet").toDF("numberA", "one", "dataA")
val numbersWithDataInB = spark.read.parquet("numbersWithDataB.parquet").toDF("numberB", "one", "dataB")
numbersWithDataInA.join(numbersWithDataInB, Seq("one")).write.mode("overwrite").parquet("joined.parquet")
Fails with Caused by: java.lang.OutOfMemoryError: Java heap space