I have to DataFrames that I want to join applying Left joining.
df1 =
+----------+---------------+
|product_PK| rec_product_PK|
+----------+---------------+
| 560| 630|
| 710| 240|
| 610| 240|
df2 =
+----------+---------------+-----+
|product_PK| rec_product_PK| rank|
+----------+---------------+-----+
| 560| 610| 1|
| 560| 240| 1|
| 610| 240| 0|
The problem is that df1
contains only 500 rows, while df2
contains 600.000.000 rows and 24 partitions. My Left joining takes a while to execute. I am waiting for 5 hours and it is not finished.
val result = df1.join(df2,Seq("product_PK","rec_product_PK"),"left")
The result should contain 500 rows. I execute the code from spark-shell using the following parameters:
spark-shell -driver-memory 10G --driver-cores 4 --executor-memory 10G --num-executors 2 --executor-cores 4
How can I speed up the process?
UPDATE:
The output of df2.explain(true)
:
== Parsed Logical Plan ==
Repartition 5000, true
+- Project [product_PK#15L AS product_PK#195L, product_PK#189L AS reco_product_PK#196L, col2#190 AS rank#197]
+- Project [product_PK#15L, array_elem#184.product_PK AS product_PK#189L, array_elem#184.col2 AS col2#190]
+- Project [product_PK#15L, products#16, array_elem#184]
+- Generate explode(products#16), true, false, [array_elem#184]
+- Relation[product_PK#15L,products#16] parquet
== Analyzed Logical Plan ==
product_PK: bigint, rec_product_PK: bigint, rank: int
Repartition 5000, true
+- Project [product_PK#15L AS product_PK#195L, product_PK#189L AS reco_product_PK#196L, col2#190 AS rank_product_family#197]
+- Project [product_PK#15L, array_elem#184.product_PK AS product_PK#189L, array_elem#184.col2 AS col2#190]
+- Project [product_PK#15L, products#16, array_elem#184]
+- Generate explode(products#16), true, false, [array_elem#184]
+- Relation[product_PK#15L,products#16] parquet
== Optimized Logical Plan ==
Repartition 5000, true
+- Project [product_PK#15L, array_elem#184.product_PK AS rec_product_PK#196L, array_elem#184.col2 AS rank#197]
+- Generate explode(products#16), true, false, [array_elem#184]
+- Relation[product_PK#15L,products#16] parquet
== Physical Plan ==
Exchange RoundRobinPartitioning(5000)
+- *Project [product_PK#15L, array_elem#184.product_PK AS rec_PK#196L, array_elem#184.col2 AS rank#197]
+- Generate explode(products#16), true, false, [array_elem#184]
+- *FileScan parquet [product_PK#15L,products#16] Batched: false, Format: Parquet, Location: InMemoryFileIndex[s3://data/result/2017-11-27/..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<product_PK:bigint,products:array<struct<product_PK:bigint,col2:int>>>