24

I have a need of joining tables using Spark SQL or Dataframe API. Need to know what would be optimized way of achieving it.

Scenario is:

  1. All data is present in Hive in ORC format (Base Dataframe and Reference files).
  2. I need to join one Base file (Dataframe) read from Hive with 11-13 other reference file to create a big in-memory structure (400 columns) (around 1 TB in size)

What can be best approach to achieve this? Please share your experience if some one has encounter similar problem.

Sim
  • 13,147
  • 9
  • 66
  • 95
S. K
  • 495
  • 2
  • 7
  • 14

3 Answers3

24

My default advice on how to optimize joins is:

  1. Use a broadcast join if you can (see this notebook). From your question it seems your tables are large and a broadcast join is not an option.

  2. Consider using a very large cluster (it's cheaper that you may think). $250 right now (6/2016) buys about 24 hours of 800 cores with 6Tb RAM and many SSDs on the EC2 spot instance market. When thinking about total cost of a big data solution, I find that humans tend to substantially undervalue their time.

  3. Use the same partitioner. See this question for information on co-grouped joins.

  4. If the data is huge and/or your clusters cannot grow such that even (3) above leads to OOM, use a two-pass approach. First, re-partition the data and persist using partitioned tables (dataframe.write.partitionBy()). Then, join sub-partitions serially in a loop, "appending" to the same final result table.

Side note: I say "appending" above because in production I never use SaveMode.Append. It is not idempotent and that's a dangerous thing. I use SaveMode.Overwrite deep into the subtree of a partitioned table tree structure. Prior to 2.0.0 and 1.6.2 you'll have to delete _SUCCESS or metadata files or dynamic partition discovery will choke.

Hope this helps.

Community
  • 1
  • 1
Sim
  • 13,147
  • 9
  • 66
  • 95
  • Thanks will think on these lines. – S. K Jun 16 '16 at 09:15
  • Just curiosity, is this 6TB+800 core one server on AWS? – user650749 Mar 23 '17 at 20:24
  • It was a cluster, not a single server. Yes, on AWS. – Sim Mar 24 '17 at 22:07
  • 1
    @Sim. I would be grateful to you, if you can explain your fourth point with an example(join sub-partitions serially in a loop, "appending" to the same final result table). Could you please elaborate more by taking two dataframe as examples and how to loop in passes – vikrant rana Nov 26 '18 at 20:12
  • 3
    @vikrantrana I don't have the bandwidth to do this but the basic idea is simple: a large USING join can be broken into a union of smaller joins of subsets of the join column space. You build the subsets by applying consistent partitioning to both the left and right side of the join. For example, if you are joining on an integer ID, you can partition by the ID modulo some number, e.g., `df.withColumn("par_id", id % 256).repartition(256, 'par_id).write.partitionBy("par_id")...` Then iterate over `persisted.select('par_id).distinct.collect` joining each partition + persisting again. Then union. – Sim Nov 28 '18 at 03:35
  • @Thanks Sim for your kind help. I will give it a try – vikrant rana Nov 28 '18 at 03:58
  • @Sim. based on your explanation, I was able to made some code and I think it's working fine. I have also explained that as an answer to some user. here is the link. ( https://stackoverflow.com/questions/53524062/efficient-pyspark-join/53720497#53720497 ) .Am I doing it right way? Could you please also check and let me know ? – vikrant rana Dec 11 '18 at 08:58
  • @vikrantrana LGTM – Sim Dec 11 '18 at 22:16
  • @Sim. Thanks for spending your valuable time and guidance. – vikrant rana Dec 12 '18 at 03:48
  • _serially_ . For real? I'm joining a billion rows to a billion rows. Why would _anything_ be done serially in that case? See the other answer about `bucketed joins` for a better approach – WestCoastProjects Nov 15 '22 at 02:01
  • @WestCoastProjects a billion x billion join is not a "huge" join by Spark standards, not by a long shot. Separately, bucketing doesn't work well with modern Spark persistence, e.g., Delta, while partitioning does. Assuming data can be persisted and will be used repeatedly for joins on the same key(s), Delta with indexing by the join key could help as well (I wrote the original answer back in 2016 when Delta wasn't an option). – Sim Nov 16 '22 at 10:52
  • @sim It is exactly due to `Delta` that I am researching this: it breaks bucketing. it is for versioning not performance and afaict has "no answer" for the ensuing performance degradation. It appears to be "one or the other": large scale performance *or* `Delta` so my assumption is we will need to handle large scale analytics via non-`Delta` tables. – WestCoastProjects Nov 16 '22 at 16:15
8

Spark uses SortMerge joins to join large table. It consists of hashing each row on both table and shuffle the rows with the same hash into the same partition. There the keys are sorted on both side and the sortMerge algorithm is applied. That's the best approach as far as I know.

To drastically speed up your sortMerges, write your large datasets as a Hive table with pre-bucketing and pre-sorting option (same number of partitions) instead of flat parquet dataset.

tableA
  .repartition(2200, $"A", $"B")
  .write
  .bucketBy(2200, "A", "B")
  .sortBy("A", "B")   
  .mode("overwrite")
  .format("parquet")
  .saveAsTable("my_db.table_a")


tableb
  .repartition(2200, $"A", $"B")
  .write
  .bucketBy(2200, "A", "B")
  .sortBy("A", "B")    
  .mode("overwrite")
  .format("parquet")
  .saveAsTable("my_db.table_b")

The overhead cost of writing pre-bucketed/pre-sorted table is modest compared to the benefits.

The underlying dataset will still be parquet by default, but the Hive metastore (can be Glue metastore on AWS) will contain precious information about how the table is structured. Because all possible "joinable" rows are colocated, Spark won't shuffle the tables that are pre-bucketd (big savings!) and won't sort the rows within the partition of table that are pre-sorted.

val joined = tableA.join(tableB, Seq("A", "B"))

Look at the execution plan with and without pre-bucketing.

This will not only save you a lot of time during your joins, it will make it possible to run very large joins on relatively small cluster without OOM. At Amazon, we use that in prod most of the time (there are still a few cases where it is not required).

To know more about pre-bucketing/pre-sorting:

Boris
  • 1,093
  • 2
  • 14
  • 22
  • The concept is correct: using `bucketed tables`. But the above is *not* hive. Spark bucketing is incompatible with hive bucketing. btw Have you done any performance/load tests on this approach? – WestCoastProjects Nov 15 '22 at 01:59
  • "Spark bucketing is incompatible with hive bucketing" check his 4th link: https://www.databricks.com/session/hive-bucketing-in-apache-spark and https://issues.apache.org/jira/browse/SPARK-19256 – Kushagra Verma Mar 01 '23 at 20:26
  • why do you repartition before bucketing? – Nebi M Aydin Apr 01 '23 at 22:25
3

Partition the source use hash partitions or range partitions or you can write custom partitions if you know better about the joining fields. Partition will help to avoid repartition during joins as spark data from same partition across tables will exist in same location. ORC will definitely help the cause. IF this is still causing spill, try using tachyon which will be faster than disk

Hari
  • 441
  • 5
  • 12