0

I have to DataFrames that I want to join applying Left joining.

df1 =

+----------+---------------+
|product_PK| rec_product_PK|
+----------+---------------+
|       560|            630|
|       710|            240|
|       610|            240|

df2 =

+----------+---------------+-----+
|product_PK| rec_product_PK| rank|
+----------+---------------+-----+
|       560|            610|    1|
|       560|            240|    1|
|       610|            240|    0|

The problem is that df1 contains only 500 rows, while df2 contains 600.000.000 rows and 24 partitions. My Left joining takes a while to execute. I am waiting for 5 hours and it is not finished.

val result = df1.join(df2,Seq("product_PK","rec_product_PK"),"left")

The result should contain 500 rows. I execute the code from spark-shell using the following parameters:

spark-shell -driver-memory 10G --driver-cores 4 --executor-memory 10G --num-executors 2 --executor-cores 4

How can I speed up the process?

UPDATE:

The output of df2.explain(true):

== Parsed Logical Plan ==
Repartition 5000, true
+- Project [product_PK#15L AS product_PK#195L, product_PK#189L AS reco_product_PK#196L, col2#190 AS rank#197]
   +- Project [product_PK#15L, array_elem#184.product_PK AS product_PK#189L, array_elem#184.col2 AS col2#190]
      +- Project [product_PK#15L, products#16, array_elem#184]
         +- Generate explode(products#16), true, false, [array_elem#184]
            +- Relation[product_PK#15L,products#16] parquet

== Analyzed Logical Plan ==
product_PK: bigint, rec_product_PK: bigint, rank: int
Repartition 5000, true
+- Project [product_PK#15L AS product_PK#195L, product_PK#189L AS reco_product_PK#196L, col2#190 AS rank_product_family#197]
   +- Project [product_PK#15L, array_elem#184.product_PK AS product_PK#189L, array_elem#184.col2 AS col2#190]
      +- Project [product_PK#15L, products#16, array_elem#184]
         +- Generate explode(products#16), true, false, [array_elem#184]
            +- Relation[product_PK#15L,products#16] parquet

== Optimized Logical Plan ==
Repartition 5000, true
+- Project [product_PK#15L, array_elem#184.product_PK AS rec_product_PK#196L, array_elem#184.col2 AS rank#197]
   +- Generate explode(products#16), true, false, [array_elem#184]
      +- Relation[product_PK#15L,products#16] parquet

== Physical Plan ==
Exchange RoundRobinPartitioning(5000)
+- *Project [product_PK#15L, array_elem#184.product_PK AS rec_PK#196L, array_elem#184.col2 AS rank#197]
   +- Generate explode(products#16), true, false, [array_elem#184]
      +- *FileScan parquet [product_PK#15L,products#16] Batched: false, Format: Parquet, Location: InMemoryFileIndex[s3://data/result/2017-11-27/..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<product_PK:bigint,products:array<struct<product_PK:bigint,col2:int>>>
zero323
  • 322,348
  • 103
  • 959
  • 935
Markus
  • 3,562
  • 12
  • 48
  • 85
  • @RaphaelRoth: I don't think that `600.000.000` is such a huge number of rows. Therefore I cannot understand why filtering or joining takes such a long time (I mean `count()` or `show()` after these operations). – Markus Jan 28 '18 at 19:58
  • maybe your data is still too large given you have only 24 partitions. With your configuration, you your maximum task size is limited to ~ 0.5*10/4=1.25 GB. You should try to increase the number of partitions for df2 (using e.g. `df2.repartition(1000)`). Or try to set `spark.sql.shuffle.partitions` to something like 5000 – Raphael Roth Jan 28 '18 at 20:01
  • @RaphaelRoth: So, if I understood correctly, I should firstly apply `df2.repartition(1000)).` and then I should do filtering. Finally I should do `count()` and then `join`. Right? – Markus Jan 28 '18 at 20:09
  • I would not do the filtering, just the broadcast-join (but I assume spark is doing a broadcast-join anyway, you should check the physical plan to see what spark is doing). In general I would try to increase parallelism, and also keep an eye on the Spark UI to check which stage is causing problems – Raphael Roth Jan 28 '18 at 20:12
  • @RaphaelRoth: How to increase parallelism? Would you increase `spark.sql.autoBroadCastJoinThreshold` to more Mb? – Markus Jan 28 '18 at 20:19
  • no I would not increase `spark.sql.autoBroadCastJoinThreshold` as you can simply force a broadcast-join using the bordcast-hint as mentioned in the answers. I would just try to increase the number of partitions of df2 – Raphael Roth Jan 28 '18 at 20:22
  • @RaphaelRoth: Ok, I see. I am now testing 1000 partitions. If it takes too long time again, I will try `df2.repartition(2000)` and so on. – Markus Jan 28 '18 at 20:26
  • 1
    note that `explode` can be extremly slow in spark 2.x (see https://issues.apache.org/jira/browse/SPARK-21657?attachmentOrder=desc), you should post your entire code. Maybe it's not the join but the explode which is causing problems! – Raphael Roth Jan 29 '18 at 06:31
  • @RaphaelRoth: Please check this question that I posted some days ago (https://stackoverflow.com/questions/48480647/how-to-transform-dataframe-before-joining-operation). I posted the example and code. I use the solution proposed there. – Markus Jan 29 '18 at 09:53
  • @RaphaelRoth: I've just checked `count()` right after `explode` (before `join`). It took some 3 minutes. So, I assume that the problem is with `join`, because when I put `count` after `join` the process gets stuck (with any repartitioning). – Markus Jan 29 '18 at 09:55
  • Note that count does NOT fully evaluate the dataframe, you need to do df.rdd.count – Raphael Roth Jan 29 '18 at 10:08

2 Answers2

4

You should probably use a different type of join. By default the join you are making assumes both dataframes are large and therefore a lot of shuffling is done (Generally each row would be hashed, the data would be shuffled based on the hashing, then a per executor joining would be done). You can see this by typing using explain on the result to see the execution plan.

Instead consider using the broadcast hint:

val result = df2.join(broadcast(df1),Seq("product_PK","rec_product_PK"),"right")

note that I flipped the join order so the broadcast would appear in the join parameters. The broadcast function is part of org.apache.spark.sql.functions

This would do a broadcast join instead, df1 would be copied to all executors and the joining would be done locally avoiding the need to shuffle the large df2.

Assaf Mendelson
  • 12,701
  • 5
  • 47
  • 56
  • Could you please show how to use `explain`? Also, could you please put some reference to what you explained in the first paragraph? It's an interesting fact that I would like to read more about. – Markus Jan 28 '18 at 15:26
  • I increased the number of executors to 6 (in my case this is the maximum) and use `broadcast` as you explained. It seems to get stuck again. Waiting. I use `result.show()` to push Spark to execute joining. – Markus Jan 28 '18 at 15:38
  • why did you changed the join order? You can also write `broadcast(df1).join(df2),Seq("product_PK","rec_product_PK"),"left")` – Raphael Roth Jan 28 '18 at 16:35
  • 1
    Unless auto broadcast threshold is very low there is no difference compared to the code, that OP already has. – Alper t. Turker Jan 28 '18 at 19:55
  • @user8371915 auto broadcast doesn't always work. Specifically if the dataframe is created from an external source such as file and it has not been added to hive metastore. – Assaf Mendelson Jan 29 '18 at 07:22
1

Given the exceptionally small size of your df1, it might be worth considering to first collect it into a list, and filter the large df2 with the list down to a comparably small dataframe, which is then used for a left join with df1:

val df1 = Seq(
  (560L, 630L),
  (710L, 240L),
  (610L, 240L)
).toDF("product_PK", "rec_product_PK")

val df2 = Seq(
  (560L, 610L, 1),
  (560L, 240L, 1),
  (610L, 240L, 0)
).toDF("product_PK", "rec_product_PK", "rank")

import org.apache.spark.sql.Row

val pkList = df1.collect.map{
  case Row(pk1: Long, pk2: Long) => (pk1, pk2)
}.toList
// pkList: List[(Long, Long)] = List((560,630), (710,240), (610,240))

def inPkList(pkList: List[(Long, Long)]) = udf(
  (pk1: Long, pk2: Long) => pkList.contains( (pk1, pk2) )
)

val df2Filtered = df2.where( inPkList(pkList)($"product_PK", $"rec_product_PK") )
// +----------+--------------+----+
// |product_PK|rec_product_PK|rank|
// +----------+--------------+----+
// |       610|           240|   0|
// +----------+--------------+----+

df1.join(df2Filtered, Seq("product_PK", "rec_product_PK"), "left_outer")
// +----------+--------------+----+
// |product_PK|rec_product_PK|rank|
// +----------+--------------+----+
// |       560|           630|null|
// |       710|           240|null|
// |       610|           240|   0|
// +----------+--------------+----+
Leo C
  • 22,006
  • 3
  • 26
  • 39
  • I get this error when creating `pkList`: `scala.MatchError: [67207790,67210680] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)` – Markus Jan 28 '18 at 17:05
  • If I run `df2Filtered.count()` after the line `val df2Filtered = df2.where( inPkList(pkList)($"product_PK", $"rec_product_PK") )`, then it takes a while again. I don't know if it even will finish. – Markus Jan 28 '18 at 17:09
  • The `MatchError` is probably caused by the `Int` type in my example not matching your actual PK column type. I've updated the answer to use `Long`. – Leo C Jan 28 '18 at 17:13
  • Yes, I changed the type to Long. But again the process gets stuck, taking a while to finish. I think that 600.000.000 should not be such a great amount of data for Spark. I do not understand why this operation cannot be executed. Maybe I should not run `count` and I should just directly save `result` in S3 using `write`? – Markus Jan 28 '18 at 17:17
  • The long wait for count might imply that `df2Filtered` is still relatively big compared with `df1` (i.e. there are many rows in `df2` with PKs matching the PK list), in which case it would make sense to apply `broadcast` to `df1` for the final join. – Leo C Jan 28 '18 at 18:49
  • Doesn't `count` imply that filtering is actually executed, because Spark is lazy? When I run `val df2Filtered = df2.where( inPkList(pkList)($"product_PK", $"rec_product_PK") )`, it gets executed in few seconds, but it's lazy execution. – Markus Jan 28 '18 at 19:32
  • This looks completely pointless. You filter out records which would be efficiently removed in OP code, but replace constant time lookups with _O(m)_ where m= 500 linear search. Why in Earth? – Alper t. Turker Jan 28 '18 at 20:37
  • Point well taken, although I see 500-element in-memory search a trivial cost. I, too, would've first tried `join` with `broadcast` before the suggested `collect` approach. I suppose the main question is whether Spark can more optimally perform filtering a single dataframe than joining the dataframe with another one. – Leo C Jan 28 '18 at 21:35