28

I have two DataFrames A and B:

  • A has columns (id, info1, info2) with about 200 Million rows
  • B only has the column id with 1 million rows

The id column is unique in both DataFrames.

I want a new DataFrame which filters A to only include values from B.

if B was very small I know I would something along the lines of

A.filter($("id") isin B("id"))

but B is still pretty large, so not all of it can fit as a broadcast variable.

and I know I could use

A.join(B, Seq("id"))

but that wouldn't harness the uniqueness and I'm afraid will cause unnecessary shuffles.

What is the optimal method to achieve that task?

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
DeanLa
  • 1,871
  • 3
  • 21
  • 37
  • 1
    What makes you think that _"I'm afraid will cause unnecessary shuffles"_? – Jacek Laskowski May 08 '17 at 08:13
  • I believe Spark doesn't store all of the small dataframe to all nodes, causing it to shuffle when join. Also if spark knows of the uniqueness, it could stop sending values if one has been found. Please correct me if I'm wrong. – DeanLa May 08 '17 at 08:51
  • Sounds correct, but guess it's case by case given that all the Spark optimizations are pretty new/young and not necessarily battle-tested. – Jacek Laskowski May 08 '17 at 09:53

3 Answers3

31

If you have not applied any partitioner on Dataframe A, May be this will help you understanding Join And Shuffle concepts.

Without Partitioner :

A.join(B, Seq("id"))

By default, this operation will hash all the keys of both dataframes, sending elements with the same key hash across the network to the same machine, and then join together the elements with the same key on that machine. Here you have to notice that both dataframes shuffle across the network. enter image description here

With HashPartitioner: Call partitionBy() when building A Dataframe, Spark will now know that it is hash-partitioned, and calls to join() on it will take advantage of this information. In particular, when we call A.join(B, Seq("id")), Spark will shuffle only the B RDD. Since B has less data than A you don't need to apply partitioner on B

ex:

 val A = sc.sequenceFile[id, info1, info2]("hdfs://...")
     .partitionBy(new HashPartitioner(100)) // Create 100 partitions
     .persist()
 A.join(B, Seq("id"))

enter image description here

Reference is from Learning Spark book.

Aravind Kumar Anugula
  • 1,304
  • 3
  • 14
  • 35
  • 4
    What about the "cost" of `partitionBy`? Wouldn't it cause a shuffle before `join` so effectively the cost (= the number of shuffles) is going to be the same? – Jacek Laskowski May 08 '17 at 08:56
  • 3
    @JacekLaskowski I agree that partitionBy will cost you if you are applying it on existing RDD, but here when you are building Dataframe/RDD itself you are applying partitionBy so it won't cost you extra. – Aravind Kumar Anugula May 08 '17 at 09:07
  • 1
    So, what's the difference between doing a "pure" `join` without `partitionBy` vs one with `partitionBy` if that's all the job does? No difference? – Jacek Laskowski May 08 '17 at 09:58
  • 1
    pure join: Default same key data will splitting into different partitions so when you join first step is to move all data having same key to same partition. join with partitionBy : When you are creating dataframe, data will be partitioned such way that same key data will be part of same partition. Hence join don't need to move all data having same key to same partition which will save you more shuffle. – Aravind Kumar Anugula May 08 '17 at 10:24
  • 2
    Out of curiosity, if you you do `B.join(A, Seq("id"))`, is spark smart enough to still only shuffle B RDD`? – Justin Li Jun 14 '18 at 23:03
  • 1
    @JustinLi Do you have an answer to the point you have raised? I am also curious as to what happens when you do `B.join(A, Seq('id'))`. – cph_sto Mar 14 '19 at 19:04
  • does .partitionBy work only for RDDs (and not dataframes) ? i tried it and got an error – Luis Leal Apr 22 '20 at 22:27
  • 1
    @JacekLaskowski I think the benefit of `join` with `partitionBy` is shown only when you have subsequent operations that rely on the partition info (such as join(), cogroup(), groupWith(), join(), leftOuterJoin(), rightOuter Join(), groupByKey(), reduceByKey(), combineByKey(), and lookup()). If there are no such subsequent operations, meaning if you do the join one time only, I guess the cost is the same regarding shuffling. – jack Aug 09 '20 at 19:44
  • what is the equivalent code of this in python? – lightbox142 Jun 18 '21 at 13:34
  • Can this be achieved in spark SQL? Thanks – mrbrahman May 24 '22 at 04:21
26

My default advice on how to optimize joins is:

  1. Use a broadcast join if you can (From your question it seems your tables are large and a broadcast join is not an option). One option in Spark is to perform a broadcast join (aka map-side join in hadoop world). With broadcast join, you can very effectively join a large table (fact) with relatively small tables (dimensions) by avoiding sending all data of the large table over the network.

    You can use broadcast function to mark a dataset to be broadcast when used in a join operator. It uses spark.sql.autoBroadcastJoinThreshold setting to control the size of a table that will be broadcast to all worker nodes when performing a join.

  2. Use the same partitioner. If two RDDs have the same partitioner, the join will not cause a shuffle. Note however, that the lack of a shuffle does not mean that no data will have to be moved between nodes. It's possible for two RDDs to have the same partitioner (be co-partitioned) yet have the corresponding partitions located on different nodes (not be co-located). This situation is still better than doing a shuffle, but it's something to keep in mind. Co-location can improve performance, but is hard to guarantee.

  3. If the data is huge and/or your clusters cannot grow such that even (2) above leads to OOM, use a two-pass approach. First, re-partition the data and persist using partitioned tables (dataframe.write.partitionBy()). Then, join sub-partitions serially in a loop, "appending" to the same final result table.

  • I used a broadcast in my problem, but when I re-run it a few times (testing different conditions), I believe it gets stuck in the memory somehow. How to I do it in such a way that I can reuse the previous broadcast, or just overwrite it? Thanks! – python_enthusiast Jul 18 '19 at 17:31
6

If I understand your question correctly, you want to use a broadcast join that replicates DataFrame B on every node so that the semi-join computation (i.e., using a join to filter id from DataFrame A) can compute independently on every node instead of having to communicate information back-and-forth between each other (i.e., shuffle join).

You can run join functions that explicitly call for a broadcast join to achieve what you're trying to do:

import org.apache.spark.sql.functions.broadcast

val joinExpr = A.col("id") === B.col("id")

val filtered_A = A.join(broadcast(B), joinExpr, "left_semi")

You can run filtered_A.explain() to verify that a broadcast join is being used.

bshelt141
  • 1,183
  • 15
  • 31