3

I am wondering whether we can force Spark to use a custom partitioning key during a join operation with two dataframes.

For example, let's consider

df1: DataFrame - [groupid, other_column_a]
df2: DataFrame - [groupid, other_column_b]

If I run

df_join = df1.join(df2, "groupid")

Spark will set "groupid" as a partition key and performs the join on each partition. Problem is, this can runs out of memory on a machine if the partition is too big.

However, it seems theoretically possible to perform the join with say (groupid, other_column_a) as the partitioning key (to reduce the size of each partition).

Is it possible to do it with Spark ? I tried to do df1.repartition("group_id","other_column_a") upfront but this is overriden by the join (I check it with df_join.explain()). I can't find any resource online that explains how to do this.

Thanks!

Visual explanation

hyperc54
  • 315
  • 3
  • 10

2 Answers2

3

If you are joining on some integer column I'd, you can partition your dataframe , with I'd modulo some number.. ie how many partitions you want.. this way the id which shares some common hash value will be grouped together in one partition.. then you can perform your join by breaking down in multiple joins.. joining each partition serially in a loop.. I have explained this case in detail.. Efficient pyspark join

vikrant rana
  • 4,509
  • 6
  • 32
  • 72
  • Thanks for your answer! It feels like it should work. It's quite cumbersome to do though. I am curious why this isn't easier to do this in Spark ! – hyperc54 Jan 02 '19 at 10:32
  • You can also use glom and spark partition I'd method to see how rows are being distributed in each partition.. let me know in case you need details on this – vikrant rana Jan 02 '19 at 11:33
  • @vikrantrana I had some Qs on partitioner, may be I'll ask you if I don't find answers. Incidentally, I was working on customised partitioners today in `PySpark` :) – cph_sto Jun 27 '19 at 11:15
1

However, it seems theoretically possible to perform the join with say (groupid, other_column_a)

That's not correct. To perform join Spark has to move all records with groupid to a single partition, therefore using (groupid, other_column_a) would possible if:

  • (groupid, other_column_a) where join keys.
  • There was a functional relationship between other_column_a and group_id.

The first condition is clearly not satisfied, as you join only by the groupid, the second wouldn't resolve the problem as distribution would be the same or worse.

There are other possible solutions for skewed joins like separate handling of skewed groups or iterative broadcast joins (see answer and comments in Spark final task takes 100x times longer than first 199, how to improve).

  • Hi ! Thanks for your answer, however it is not clear to me why _"Spark has to move all records with groupid to a single partition"_ Could you explain the logic or send a link which explains this ? – hyperc54 Nov 29 '18 at 12:57
  • I just added a picture to accompany my question – hyperc54 Nov 29 '18 at 13:12