0

I want to join 2 very big tables by specific mutual key using Spark, I try to understand what is the optimal way to do that.

Let's say, for the example:

  • table 1 contains 900M rows and ~100 columns
  • table 2 contains 600M rows and ~200 columns.
  • We can't use "broadcast join", the tables are big and can't be broadcast.

I want to join (inner join) the tables using the mutual 'id' columns that exists in both of them, in addition, I know that the id columns contains the same values in both of the tables, there is no id value that exists in one but doesn't exist in the other.

The ideal way I can think of is to "divide" each one of my tables into partitions/buckets that contains the same 'id' values and to send them to the same executor that will calculate the join result with minimum data shuffling in the cluster.

My questions are:

  1. If I use for example .repartition(5, 'id') for each one of the tables - each one of the 5 partitions will contain the same 'id' values? (as long as we have the same 'id' values in both of them)

for example:

df1
+---+---+------+
|age| id|  name|
+---+---+------+
|  5|  1| David|
| 50|  2|  Lily|
| 10|  3|   Dan|
| 15|  4|Nicole|
| 16|  5|  Dana|
| 19|  6|   Ron|
| 20|  7| Alice|
| 22|  8|  Nora|
| 45|  9|  Sara|
| 70| 10| Aaron|
+---+---+------+


df2
+---+-----+
| id|price|
+---+-----+
|  1| 30.8|
|  1| 40.3|
|  2|100.0|
|  2| 30.1|
|  3| 99.0|
|  3|102.0|
|  4| 81.2|
|  4| 91.2|
|  5| 73.4|
|  6| 22.2|
|  7|374.4|
|  8|669.7|
|  9|  4.8|
| 10|35.38|
+---+-----+

df1.repartition(5,'id')
df2.repartition(5,'id')

If df1 partitions are: [id=1,id=2],[id=3,id=4],[id=5,id=6],[id=7,id=8],[id=9,id=10]

Is it necessarily be the same for df2?

  1. If I use 'bucketBy' in the same way, will I get the same 'id' values in the buckets of the tables?

  2. Will spark send the right partitions to the same executor? I mean, the partition that contains [id=1,id=2] of table 1 and the partition that contains [id=1,id=2] for table 2 will be sent to the same executor for the join.

If I miss something, or you can recommend another way to join 2 big tables under the assumptions I mentioned, it will be very helpful.

nofar mishraki
  • 526
  • 1
  • 4
  • 15

1 Answers1

1

Take a look at this answer.
TLDR: If you want to join them once and its the only aim for re-partitioning, just simply join them.

  • Hi, thanks. So, if there anything smart I can do to "help" the execution plan and to reduce shuffle if I join 2 big tables only once? I'm asking because sometimes I see a lot of shuffle in cluster that fails my jobs, and I try to restructure the data before in a way that reduce shuffle. – nofar mishraki Jun 16 '20 at 08:12
  • I guess spark chooses the best strategy, so if your cluster fails to run it you have two options left: 1- scaling up. 2- break you task into smaller ones, it takes more time to run but it would. for example you can filter both tables on the id%10==0 and perform join on them and store result, then id%10==2 etc. @nofar – Ebrahim Badrestani Jun 16 '20 at 09:09