Use Bucketing
If you are using spark v2.3 and greater, you can use bucketing to avoid the shuffle that takes place on the join after the write.
With bucketing you can put your data into buckets based on a column (usually the one you are joining on). Then when spark reads the data from the buckets again, you will not need to perform an exchange.
1. Sample Data
Transactions (Fact)
t1.sample(n=5)
ad_id impressions
30 528749
1 552233
30 24298
30 311914
60 41661
Names (Dimension)
t2.sample(n=5)
ad_id brand_name
1 McDonalds
30 McDonalds
30 Coca-Cola
1 Coca-Cola
30 Levis
2. Disable Broadcast Join
Since one table is large and the other is small, you will need to disable broadcastJoin
.
sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")
3. Without Bucketing
t = spark.createDataFrame(t1)
b = spark.createDataFrame(t2)
t.write.saveAsTable('unbucketed_transactions')
b.write.saveAsTable('unbucketed_brands')
unbucketed_transactions = sqlContext.table("unbucketed_transactions")
unbucketed_brands = sqlContext.table("unbucketed_brands")
unbucketed_transactions.join(unbucketed_brands, 'ad_id').explain()
+- Project [ad_id#1842L, impressions#1843L, brand_name#1847]
+- SortMergeJoin [ad_id#1842L], [ad_id#1846L], Inner
:- Sort [ad_id#1842L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(ad_id#1842L, 200), true, [id=#1336] <-- 0_0
: +- Project [ad_id#1842L, impressions#1843L]
: +- Filter isnotnull(ad_id#1842L)
: +- FileScan parquet default.unbucketed_transactions
+- Sort [ad_id#1846L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(ad_id#1846L, 200), true, [id=#1337] <-- 0_0
+- Project [ad_id#1846L, brand_name#1847]
+- Filter isnotnull(ad_id#1846L)
+- FileScan parquet default.unbucketed_brands
As you can see there is an exchange that takes place due to the unbucketed join.
4. With Bucketing
# The number 30 tells spark how large the buckets should be.
# The second parameter is what column the bucket should be based on.
unbucketed_transactions.write \
.bucketBy(30,'ad_id') \
.sortBy('ad_id') \
.saveAsTable('bucketed_transactions')
unbucketed_brands.write \
.bucketBy(30,'ad_id') \
.sortBy('ad_id') \
.saveAsTable('bucketed_brands')
transactions = sqlContext.table("bucketed_transactions")
brands = sqlContext.table("bucketed_brands")
transactions.join(brands, 'ad_id').explain()
+- Project [ad_id#1867L, impressions#1868L, brand_name#1872]
+- SortMergeJoin [ad_id#1867L], [ad_id#1871L], Inner
:- Sort [ad_id#1867L ASC NULLS FIRST], false, 0
: +- Project [ad_id#1867L, impressions#1868L]
: +- Filter isnotnull(ad_id#1867L)
: +- FileScan parquet default.bucketed_transactions
+- Sort [ad_id#1871L ASC NULLS FIRST], false, 0
+- Project [ad_id#1871L, brand_name#1872]
+- Filter isnotnull(ad_id#1871L)
+- FileScan parquet default.bucketed_brands
As can be seen by the plan above, there are no more exchanges that take place.
Thus, you will improve your performance, by avoiding the exchange.