1

There is spark sql job:

spark.sql(s"""SELECT *
  FROM (
  select * from default.table1
  where
  created_dt between date '2018-01-01' and '2018-01-02'
  group by 1,2) table11, -- about 100,000,000 records
  default.table2 table22,-- about 600,000,000 records
  default.table3 table33,-- about 3000,000,000 records
  default.table4 table44-- about 100,000,000 records
  WHERE table22.item_id = table11.item_id
  AND hot.item_site_id  IN (SELECT SITE_ID FROM default.table5)
  AND table22.item_id = table33.item_id
  AND table22.end_dt = table33.end_dt
  AND table22.end_dt >= date '2018-01-01' - interval '180' day
  LIMIT 10000""")
  .collect()
  //.map(t => "Id: " + t(0))
  .foreach(println)

In the job, 4 Hive table should be joined on item_id and end_dt and other fields. there are about 100,000,000 records in each table.

How to optimize the join? e.g. If each table is partitioned, the performance can be improved greatly? Thanks

BAE
  • 8,550
  • 22
  • 88
  • 171

1 Answers1

1

There are a number of strategies for optimizing Spark joins. Many are outlined in this Spark Summit presentation. You can find more details about optimizing SortMergeJoin performance here.

Note that sort merge joins can operate very efficiently on already sorted data. One way to get data in the right form is to save it as a bucketized table with the data within each bucket sorted (df.write.bucketBy(n, "x").sortBy("x")). The table metastore will preserve information about the bucketing, which can be used by the query optimizer later. Note that this will NOT work if you save to a path, unless you are using something like Databricks Delta.

Beyond this, you want to look at my answer to what is an optimized way of joining large tables in Spark SQL.

Sim
  • 13,147
  • 9
  • 66
  • 95