I need to join
many DataFrames together based on some shared key columns. For a key-value RDD, one can specify a partitioner so that data points with same key are shuffled to same executor so joining is more efficient (if one has shuffle related operations before the join
). Can the same thing can be done on Spark DataFrames or DataSets?
-
@Shaido what are the steps we need to follow while using coleasc() to get optimum performance of my spark job – Shasu Mar 07 '22 at 11:41
2 Answers
You can repartition
a DataFrame after loading it if you know you'll be joining it multiple times
val users = spark.read.load("/path/to/users").repartition('userId)
val joined1 = users.join(addresses, "userId")
joined1.show() // <-- 1st shuffle for repartition
val joined2 = users.join(salary, "userId")
joined2.show() // <-- skips shuffle for users since it's already been repartitioned
So it'll shuffle the data once and then reuse the shuffle files when joining subsequent times.
However, if you know you'll be repeatedly shuffling data on certain keys, your best bet would be to save the data as bucketed tables. This will write the data out already pre-hash partitioned, so when you read the tables in and join them you avoid the shuffle. You can do so as follows:
// you need to pick a number of buckets that makes sense for your data
users.bucketBy(50, "userId").saveAsTable("users")
addresses.bucketBy(50, "userId").saveAsTable("addresses")
val users = spark.read.table("users")
val addresses = spark.read.table("addresses")
val joined = users.join(addresses, "userId")
joined.show() // <-- no shuffle since tables are co-partitioned
In order to avoid a shuffle, the tables have to use the same bucketing (e.g. same number of buckets and joining on the bucket columns).

- 3,947
- 21
- 22
-
1
-
It won't hurt but won't provide any benefit. Spark would need to shuffle anyhow for the join, unless it's a broadcast. – Silvio Oct 25 '19 at 14:08
-
2I'm trying to understand why for Spark to optimize the second job, you need to explicitly repartition by userId. Wouldn't Spark know after the first job, which required a shuffle, that the data is now partitioned by userid? – allstar Feb 21 '20 at 01:14
-
@Silvio, Thank you , while saving i.e. saveAsTable("users") where it is been saved ? in the next run of job , will it be overwritten ? how to handle it ? , My data source is s3 path. – Shasu Mar 07 '22 at 11:40
It is possible using the DataFrame/DataSet API using the repartition
method. Using this method you can specify one or multiple columns to use for data partitioning, e.g.
val df2 = df.repartition($"colA", $"colB")
It is also possible to at the same time specify the number of wanted partitions in the same command,
val df2 = df.repartition(10, $"colA", $"colB")
Note: this does not guarantee that the partitions for the dataframes will be located on the same node, only that the partitioning is done in the same way.

- 27,497
- 23
- 70
- 73
-
Thank you , what is the exact different in the above both versions, in second case how to avoid empty partitions? – Shasu Mar 07 '22 at 11:36
-
@Shyam: The first line will use the configured default number of partitions (200). Maybe this can help for the second question: https://stackoverflow.com/questions/50694848/why-do-i-get-so-many-empty-partitions-when-repartionning-a-spark-dataframe – Shaido Mar 07 '22 at 12:02
-
thanks a lot , as always you helpful , one more thing my spark job gives OOM in join , how to check the memory taken for each dataset ? any where i can check in SparkUI ? – Shasu Mar 07 '22 at 12:09
-
I am pivoting dataframe based on group (by) of colums...its very slow... how to tune it ? – Shasu Mar 07 '22 at 18:52